diff --git a/clients/python/README.md b/clients/python/README.md index 6bc02c4..c7406bb 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -33,6 +33,103 @@ with Forge(base_url="...", token="...") as forge: forge.run(prompt="...") ``` +## Multi-turn / Sessions (v0.2) + +For agent-shaped workflows that span multiple turns, use a `Session`. A session keeps acpx context across `turn()` calls — file reads, tool-call results, and conversational state persist across turns until the session is closed (or hits the server's TTL). + +### Block form (preferred — auto-close) + +```python +with forge.session(agent="claude") as s: + r1 = s.turn("Read README.md and summarize it") + print(r1.text()) # concatenated 'text' events from the model + + r2 = s.turn("Now look at the auth flow") + print(r2.duration_ms) # 12345 + for ev in r2.events: + print(ev["type"], ev.get("content")) + +# Session is closed automatically here, even if an exception was raised +# inside the block. +``` + +### Manual form (explicit lifecycle) + +For callers that need a Session to outlive a single block (background workers, cross-handler state): + +```python +s = forge.create_session(agent="claude", meta={"caller": "petalparse"}) +try: + r = s.turn("hello") +finally: + s.close() # idempotent — safe to call repeatedly, including on a + # session that was already closed by the server's TTL sweeper +``` + +`Session.close()` is idempotent: subsequent calls short-circuit without an HTTP request, and the server itself returns 200 (`already_closed: true`) on a repeat DELETE. Safe to drop in `finally` and `__exit__` blocks without `try/except`. + +### TurnResult shape + +```python +@dataclass(frozen=True) +class TurnResult: + ok: bool + session_id: str + turn_index: int # 1-based, monotonic per session + events: list[dict] # 'text' | 'tool_call' | 'thinking' | ... + stop_reason: str | None # 'end_turn' | 'timeout' | 'error' | ... + duration_ms: int + + def text(self) -> str: ... # concat of all 'text' events' content +``` + +`events` is passed through verbatim from the v0.2 server — see the server doc for the exhaustive event-type schema. `text()` is sugar for "give me the model's prose reply, ignore thinking and tool calls." + +### Listing & inspection + +```python +# All sessions for the calling token (newest first): +sessions = forge.list_sessions() +for row in sessions: + print(row["session_id"], row["turn_count"], row["closed_at"]) + +# Drop closed ones: +live = forge.list_sessions(include_closed=False) + +# Fresh state for a specific session: +state = forge.get_session("sess_abc") +# {'session_id': 'sess_abc', 'agent': 'claude', 'cwd': ..., +# 'created_at': ..., 'last_turn_at': ..., 'turn_count': 3, +# 'closed_at': None, 'live': True, 'meta': {...}} + +# Same thing from inside a block: +with forge.session() as s: + s.turn("hi") + print(s.state()["turn_count"]) +``` + +### Errors + +The same exception classes apply as v0.1. Two cases worth calling out: + +- **Cross-token access → `ForgeAPIError(404)`.** If token A asks for token B's session, the server returns 404 (not 403) so it can't be used to enumerate live session ids across token boundaries. +- **Session closed mid-flight → `ForgeAPIError(410)`.** The TTL sweeper or another caller's explicit `close()` may have ended the session between turns. + +```python +from clawdforge import ForgeAPIError + +try: + r = s.turn("hello") +except ForgeAPIError as e: + if e.status_code == 410: + # Session expired or was closed — start a fresh one. + ... + elif e.status_code == 404: + # Server forgot us, or we're asking for someone else's id. + ... + raise +``` + ## Construction ```python @@ -97,6 +194,22 @@ r = forge.run(prompt="Extract recipe data", files=[ft.file_token]) The server enforces `60 <= ttl_secs <= 86400` and returns 400 for out-of-range values, surfaced as `ForgeAPIError`. +### `session(*, agent="claude", meta=None) -> ContextManager[Session]` (v0.2) + +Block-form session — auto-closes on `__exit__`. See the "Multi-turn / Sessions (v0.2)" section above. + +### `create_session(*, agent="claude", meta=None) -> Session` (v0.2) + +Manual-lifecycle session. Caller is responsible for `s.close()` (typically in `finally`). `close()` is idempotent. + +### `list_sessions(*, include_closed=True) -> list[dict]` (v0.2) + +List sessions for the calling token. Pass `include_closed=False` to skip closed rows. + +### `get_session(session_id) -> dict` (v0.2) + +Fetch fresh server-side state for a session. Raises `ForgeAPIError(404)` if the id is unknown or belongs to another token. + ### Admin (admin-bootstrap-token gated) ```python diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index f4d1863..4e31b56 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "hatchling.build" [project] name = "clawdforge" -version = "0.1.0" -description = "Python SDK for the clawdforge LAN-only HTTP service (claude -p subprocess wrapper)." +version = "0.2.0" +description = "Python SDK for the clawdforge LAN-only HTTP service — single-turn /run plus multi-turn /sessions (v0.2)." readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Kayos", email = "kayos@sulkta.com" }] diff --git a/clients/python/src/clawdforge/__init__.py b/clients/python/src/clawdforge/__init__.py index a104a1b..7fbffed 100644 --- a/clients/python/src/clawdforge/__init__.py +++ b/clients/python/src/clawdforge/__init__.py @@ -1,13 +1,20 @@ """clawdforge — Python SDK for the LAN-only clawdforge HTTP service. -Quickstart: +Quickstart (single-turn — v0.1): >>> from clawdforge import Forge >>> forge = Forge(base_url="http://192.168.0.5:8800", token="cf_...") >>> r = forge.run(prompt='Reply with JSON: {"hi": "ok"}') >>> r.result {'hi': 'ok'} + +Multi-turn (v0.2): + >>> with forge.session(agent="claude") as s: + ... r1 = s.turn("Read README.md and summarize") + ... r2 = s.turn("Now look at the auth flow") + >>> # session auto-closed on __exit__, even on exception """ -from ._models import AppToken, FileToken, RunResult +from ._models import AppToken, FileToken, RunResult, TurnResult +from ._session import Session from .client import Forge from .exceptions import ( ForgeAPIError, @@ -19,6 +26,8 @@ from .exceptions import ( __all__ = [ "Forge", "RunResult", + "TurnResult", + "Session", "FileToken", "AppToken", "ForgeError", @@ -27,4 +36,4 @@ __all__ = [ "ForgeTransportError", ] -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/clients/python/src/clawdforge/_models.py b/clients/python/src/clawdforge/_models.py index 8852afa..7db819f 100644 --- a/clients/python/src/clawdforge/_models.py +++ b/clients/python/src/clawdforge/_models.py @@ -11,6 +11,78 @@ from typing import Any from .exceptions import ForgeAPIError, ForgeError +@dataclass(frozen=True, slots=True) +class TurnResult: + """Result of a successful `POST /sessions/{id}/turn`. + + `events` is the structured event stream the server collected from acpx + for this turn. Each event is a dict with at minimum a ``type`` key + (``"text"``, ``"tool_call"``, ``"thinking"``, ...) plus type-specific + payload fields. The SDK passes events through verbatim — see the v0.2 + server doc for the exhaustive shape. + """ + + ok: bool + session_id: str + turn_index: int + events: list[dict[str, Any]] + stop_reason: str | None + duration_ms: int + + def text(self) -> str: + """Concatenate all ``type == "text"`` events' ``content`` fields. + + Sugar for the common case where a caller wants the model's prose + reply and doesn't care about thinking / tool_call events. Returns + an empty string if no text events were emitted. + """ + parts: list[str] = [] + for ev in self.events: + if not isinstance(ev, dict): + continue + if ev.get("type") == "text": + content = ev.get("content") + if isinstance(content, str): + parts.append(content) + return "".join(parts) + + @classmethod + def from_response(cls, payload: dict[str, Any]) -> "TurnResult": + try: + ok = bool(payload.get("ok", True)) + session_id = str(payload["session_id"]) + turn_index = int(payload.get("turn_index", 0)) + events_raw = payload.get("events") or [] + if not isinstance(events_raw, list): + raise ForgeError( + f"malformed /sessions/turn response: events is " + f"{type(events_raw).__name__}, expected list" + ) + stop_reason = payload.get("stop_reason") + duration_ms = int(payload.get("duration_ms", 0)) + except (TypeError, ValueError, KeyError) as e: + raise ForgeError(f"malformed /sessions/turn response: {e}") from e + + # Same contract guard as RunResult: ok=False on a 200 is a server + # bug, surface it as ApiError so callers can't silently treat the + # turn as successful. + if not ok: + raise ForgeAPIError( + "server returned ok=False on /sessions/{id}/turn (contract violation)", + status_code=200, + body=payload, + ) + + return cls( + ok=ok, + session_id=session_id, + turn_index=turn_index, + events=list(events_raw), + stop_reason=stop_reason, + duration_ms=duration_ms, + ) + + @dataclass(frozen=True, slots=True) class RunResult: """Result of a successful `POST /run`. diff --git a/clients/python/src/clawdforge/_session.py b/clients/python/src/clawdforge/_session.py new file mode 100644 index 0000000..5fbc313 --- /dev/null +++ b/clients/python/src/clawdforge/_session.py @@ -0,0 +1,172 @@ +"""Session class for clawdforge v0.2 multi-turn API. + +A Session wraps a server-side session_id and a back-reference to the parent +Forge so all HTTP work goes through the Forge's pooled requests.Session. +There is no per-Session HTTP client. + +Idiomatic use: + + with forge.session(agent="claude") as s: + r = s.turn("hello") + print(r.text()) + +Manual use: + + s = forge.create_session(agent="claude") + try: + s.turn("hello") + finally: + s.close() + +`Session.close()` is idempotent — both because the server's DELETE /sessions +returns 200 on a re-close (with ``already_closed: true``) and because the +client short-circuits after the first successful close. Safe to call from +``finally`` blocks and ``__exit__`` without try/except. +""" +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from ._models import TurnResult + +if TYPE_CHECKING: + from .client import Forge + + +log = logging.getLogger(__name__) + + +class Session: + """Handle to a server-side multi-turn session. + + Construct via ``forge.create_session(...)`` (manual lifecycle) or + ``forge.session(...)`` (context-manager auto-close — preferred). + + Attributes: + session_id: server-assigned id (also acpx's id). + agent: agent name this session is bound to (e.g. ``"claude"``). + created_at: unix-seconds timestamp from the server at create time. + cwd: working directory the server allocated for this session, or + None if the server did not return one. + """ + + __slots__ = ("_forge", "session_id", "agent", "created_at", "cwd", "_closed") + + def __init__( + self, + forge: "Forge", + *, + session_id: str, + agent: str, + created_at: int | None = None, + cwd: str | None = None, + ) -> None: + self._forge = forge + self.session_id = session_id + self.agent = agent + self.created_at = created_at + self.cwd = cwd + self._closed = False + + # -- context manager -------------------------------------------------- + + def __enter__(self) -> "Session": + return self + + def __exit__(self, exc_type: object, exc: object, tb: object) -> None: + # Swallow close() failures — the `with` block already finished its + # work, and a network blip on close shouldn't mask the real result + # (or a real exception unwinding through __exit__). + try: + self.close() + except Exception as e: + log.warning( + "clawdforge: session %s close failed during __exit__: %s", + self.session_id, + e, + ) + + # -- public ops ------------------------------------------------------- + + @property + def closed(self) -> bool: + """True after the first successful ``close()`` call. + + Note: this is the *client's* view. The server may have GC'd the + session out from under us (TTL sweeper) without us knowing. A + subsequent ``turn()`` would surface that as ``ForgeAPIError(410)``. + """ + return self._closed + + def turn( + self, + prompt: str, + *, + files: list[str] | None = None, + timeout_secs: int | None = None, + ) -> TurnResult: + """Send one turn. Returns a TurnResult. + + Args: + prompt: required, must be non-empty. + files: optional list of ``ff_...`` file tokens from + ``Forge.upload_file``. Server-side these resolve to staged + paths visible inside the session's cwd. + timeout_secs: optional per-turn subprocess timeout. Defaults + to the parent Forge's ``default_timeout_secs``. + + Raises: + ValueError: empty prompt or already-closed session. + ForgeAPIError: 4xx/5xx — notably 404 (session vanished or + cross-token access) and 410 (session is closed / no longer + live in the server process). + ForgeAuthError, ForgeTransportError, ForgeError: as v0.1. + """ + if not prompt: + raise ValueError("prompt must be non-empty") + if self._closed: + raise ValueError( + f"session {self.session_id} already closed by this client" + ) + + return self._forge._session_turn( + session_id=self.session_id, + prompt=prompt, + files=files, + timeout_secs=timeout_secs, + ) + + def state(self) -> dict[str, Any]: + """Fetch fresh server-side state for this session. + + Returns the raw ``GET /sessions/{id}`` payload — a dict with + ``session_id``, ``agent``, ``cwd``, ``created_at``, ``last_turn_at``, + ``turn_count``, ``closed_at``, ``live``, ``meta``. + """ + return self._forge.get_session(self.session_id) + + def close(self) -> None: + """Close the session server-side. Idempotent. + + Safe to call from ``finally`` blocks and ``__exit__``. Subsequent + calls short-circuit without an HTTP round-trip. + """ + if self._closed: + return + # Mark closed BEFORE the HTTP call so that even if the request + # raises, we don't enter a retry loop on every __exit__ pass. The + # server's DELETE is itself idempotent (200 on re-close) so the + # caller can also re-issue close() through a fresh client if they + # really need to. + self._closed = True + self._forge._session_close(self.session_id) + + # -- repr ------------------------------------------------------------- + + def __repr__(self) -> str: # pragma: no cover - cosmetic + state = "closed" if self._closed else "open" + return ( + f"Session(session_id={self.session_id!r}, agent={self.agent!r}, " + f"state={state})" + ) diff --git a/clients/python/src/clawdforge/client.py b/clients/python/src/clawdforge/client.py index 5f54357..867e53e 100644 --- a/clients/python/src/clawdforge/client.py +++ b/clients/python/src/clawdforge/client.py @@ -11,14 +11,17 @@ semantics of the prompt. """ from __future__ import annotations +import logging import os +from contextlib import contextmanager from pathlib import Path -from typing import IO, Any +from typing import IO, Any, Iterator from urllib.parse import quote import requests -from ._models import AppToken, FileToken, RunResult +from ._models import AppToken, FileToken, RunResult, TurnResult +from ._session import Session from .exceptions import ( ForgeAPIError, ForgeAuthError, @@ -27,6 +30,9 @@ from .exceptions import ( ) +log = logging.getLogger(__name__) + + _DEFAULT_MODEL = "sonnet" _DEFAULT_RUN_TIMEOUT_SECS = 120 # HTTP timeout = run's subprocess timeout + this margin so we don't bail @@ -393,3 +399,211 @@ class Forge: if isinstance(payload, dict): return bool(payload.get("ok", True)) return True + + # -- /sessions (v0.2) ------------------------------------------------- + # + # The session surface is purely additive on top of v0.1. v0.1 callers + # never touch /sessions and never see Session/TurnResult. The block + # form `forge.session(agent=...)` is the preferred entry point — it + # auto-closes on exit (including on exceptions) and matches the + # cross-language idiom in the v0.2 spec. + + def create_session( + self, + *, + agent: str = "claude", + meta: dict[str, Any] | None = None, + ) -> Session: + """``POST /sessions``: create a multi-turn session. + + Manual lifecycle. Caller is responsible for ``s.close()`` — usually + via ``try/finally``. For most use cases, prefer the block form + ``with forge.session(...) as s:`` which auto-closes. + + Args: + agent: which agent the session binds to. Defaults to ``"claude"``. + meta: optional caller metadata persisted server-side in the + session ledger. + + Returns: + Session + """ + body: dict[str, Any] = {"agent": agent} + if meta is not None: + body["meta"] = meta + + # /sessions create is bounded by acpx session-create handshake — + # use the same generous default as /run so we don't bail mid-handshake. + http_timeout = self.default_timeout_secs + self.http_timeout_margin + payload = self._request("POST", "/sessions", json_body=body, timeout=http_timeout) + if not isinstance(payload, dict): + raise ForgeError( + f"unexpected /sessions response type: {type(payload).__name__}" + ) + try: + session_id = str(payload["session_id"]) + except KeyError as e: + raise ForgeError(f"malformed /sessions response: missing {e}") from e + + return Session( + self, + session_id=session_id, + agent=str(payload.get("agent", agent)), + created_at=payload.get("created_at"), + cwd=payload.get("cwd"), + ) + + @contextmanager + def session( + self, + *, + agent: str = "claude", + meta: dict[str, Any] | None = None, + ) -> Iterator[Session]: + """Context-manager shortcut: create + auto-close a session. + + Preferred form. Equivalent to:: + + s = forge.create_session(agent=agent, meta=meta) + try: + yield s + finally: + s.close() # idempotent; swallows close-time blips + + Use ``forge.create_session(...)`` directly if you need the Session + to outlive the call site (e.g. background worker threads). + """ + s = self.create_session(agent=agent, meta=meta) + try: + yield s + finally: + try: + s.close() + except Exception as e: + # Already closed / network blip during teardown — log and + # swallow so we don't mask the real exception unwinding + # through this finally block. + log.warning( + "clawdforge: session %s close failed during context-manager exit: %s", + s.session_id, + e, + ) + + def get_session(self, session_id: str) -> dict[str, Any]: + """``GET /sessions/{id}``: fetch session state. + + Returns the raw payload from the server: a dict with + ``session_id``, ``agent``, ``cwd``, ``created_at``, ``last_turn_at``, + ``turn_count``, ``closed_at`` (nullable), ``live`` (bool), ``meta``. + + Raises: + ValueError: empty ``session_id``. + ForgeAPIError(404): unknown id, or id belongs to another token + (the server intentionally returns 404 not 403 to avoid + leaking session existence across token boundaries). + """ + if not session_id: + raise ValueError("session_id must be non-empty") + slug = quote(session_id, safe="") + payload = self._request("GET", f"/sessions/{slug}", timeout=_HEALTHZ_TIMEOUT_SECS) + if not isinstance(payload, dict): + raise ForgeError( + f"unexpected /sessions/{{id}} response type: {type(payload).__name__}" + ) + return payload + + def list_sessions(self, *, include_closed: bool = True) -> list[dict[str, Any]]: + """``GET /sessions``: list sessions for the calling token. + + Returns: + List of session-row dicts (newest-first). Each row has + ``session_id``, ``app_name``, ``agent``, ``cwd``, ``created_at``, + ``last_turn_at``, ``turn_count``, ``closed_at``, ``meta``. + + Args: + include_closed: if False, only sessions where ``closed_at IS + NULL`` are returned. Default True (mirrors server default). + """ + params: dict[str, Any] = {} + # Only forward the non-default to keep the URL stable for the + # default case. + if include_closed is False: + params["include_closed"] = "false" + # We can't pass `params=` through `_request` (no kwarg) without + # broadening that helper; use the session directly for this one + # GET. Mirrors the pattern of `_request` exactly otherwise. + try: + resp = self._session.request( + "GET", + self._url("/sessions"), + headers=self._headers(), + params=params or None, + timeout=_HEALTHZ_TIMEOUT_SECS, + ) + except requests.RequestException as e: + raise ForgeTransportError(f"transport: {e}") from e + payload = self._parse(resp) + if not isinstance(payload, dict) or "sessions" not in payload: + raise ForgeError("unexpected /sessions response") + rows = payload["sessions"] + if not isinstance(rows, list): + raise ForgeError( + f"unexpected /sessions rows type: {type(rows).__name__}" + ) + return rows + + # -- internal session helpers (called from Session) ------------------- + + def _session_turn( + self, + *, + session_id: str, + prompt: str, + files: list[str] | None, + timeout_secs: int | None, + ) -> TurnResult: + """Internal: ``POST /sessions/{id}/turn``. Don't call directly — go + through ``Session.turn``. + """ + if timeout_secs is not None: + if not isinstance(timeout_secs, int) or isinstance(timeout_secs, bool): + raise ValueError("timeout_secs must be int") + if timeout_secs < _RUN_TIMEOUT_MIN or timeout_secs > _RUN_TIMEOUT_MAX: + raise ValueError( + f"timeout_secs out of range " + f"({_RUN_TIMEOUT_MIN}..{_RUN_TIMEOUT_MAX}), got {timeout_secs}" + ) + + body: dict[str, Any] = {"prompt": prompt} + if files: + body["files"] = list(files) + if timeout_secs is not None: + body["timeout_secs"] = timeout_secs + + effective_run_timeout = ( + timeout_secs if timeout_secs is not None else self.default_timeout_secs + ) + http_timeout = effective_run_timeout + self.http_timeout_margin + + slug = quote(session_id, safe="") + payload = self._request( + "POST", + f"/sessions/{slug}/turn", + json_body=body, + timeout=http_timeout, + ) + if not isinstance(payload, dict): + raise ForgeError( + f"unexpected /sessions/turn response type: {type(payload).__name__}" + ) + return TurnResult.from_response(payload) + + def _session_close(self, session_id: str) -> None: + """Internal: ``DELETE /sessions/{id}``. Idempotent server-side. + + Don't call directly — go through ``Session.close``. Errors are + swallowed by the calling Session's ``__exit__``/``finally`` + wrapping; raised here so direct callers can observe them. + """ + slug = quote(session_id, safe="") + self._request("DELETE", f"/sessions/{slug}", timeout=_HEALTHZ_TIMEOUT_SECS) diff --git a/clients/python/tests/test_sessions.py b/clients/python/tests/test_sessions.py new file mode 100644 index 0000000..567f3e8 --- /dev/null +++ b/clients/python/tests/test_sessions.py @@ -0,0 +1,479 @@ +"""Tests for the v0.2 Session / TurnResult surface of the clawdforge SDK. + +Same test framework as test_client.py — `unittest` + `responses` to +intercept HTTP. No live network. v0.1 paths covered there are not +re-exercised here; the regression test_v01_run_unchanged is a smoke check +to catch accidental breakage of the existing /run path while we layered on +the session methods. +""" +from __future__ import annotations + +import json +import unittest +from typing import Any + +import requests +import responses + +from clawdforge import ( + Forge, + ForgeAPIError, + Session, + TurnResult, +) + + +BASE_URL = "http://192.168.0.5:8800" +TOKEN = "cf_test_token_xxxxxxxx" + + +def _forge() -> Forge: + return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60) + + +def _create_payload(session_id: str = "sess_abc", agent: str = "claude") -> dict[str, Any]: + return { + "ok": True, + "session_id": session_id, + "agent": agent, + "created_at": 1_700_000_000, + "cwd": "/tmp/acpx-sessions/sess_abc", + } + + +def _turn_payload( + session_id: str = "sess_abc", + turn_index: int = 1, + events: list[dict] | None = None, + stop_reason: str = "end_turn", + duration_ms: int = 1234, +) -> dict[str, Any]: + return { + "ok": True, + "session_id": session_id, + "turn_index": turn_index, + "events": events if events is not None else [{"type": "text", "content": "hi"}], + "stop_reason": stop_reason, + "duration_ms": duration_ms, + } + + +class TestSessionBlock(unittest.TestCase): + """Block / context-manager form.""" + + @responses.activate + def test_session_block_creates_and_closes(self) -> None: + """`with forge.session() as s:` issues create on enter and close on exit.""" + captured: dict[str, Any] = {"requests": []} + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["requests"].append(("POST", "/sessions")) + return (200, {}, json.dumps(_create_payload())) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["requests"].append(("DELETE", "/sessions/sess_abc")) + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + with _forge() as f: + with f.session(agent="claude") as s: + self.assertIsInstance(s, Session) + self.assertEqual(s.session_id, "sess_abc") + self.assertEqual(s.agent, "claude") + self.assertFalse(s.closed) + # Outside the block: close was issued + self.assertTrue(s.closed) + + self.assertEqual( + captured["requests"], + [("POST", "/sessions"), ("DELETE", "/sessions/sess_abc")], + ) + + @responses.activate + def test_session_create_sends_agent_and_meta(self) -> None: + captured: dict[str, Any] = {} + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["body"] = json.loads(request.body) + captured["auth"] = request.headers.get("Authorization") + return (200, {}, json.dumps(_create_payload(agent="claude"))) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + with _forge() as f, f.session(agent="claude", meta={"caller": "test"}) as s: + self.assertEqual(s.session_id, "sess_abc") + + self.assertEqual(captured["auth"], f"Bearer {TOKEN}") + self.assertEqual(captured["body"], {"agent": "claude", "meta": {"caller": "test"}}) + + +class TestSessionTurn(unittest.TestCase): + @responses.activate + def test_session_turn_round_trip(self) -> None: + """Mock POST /sessions and POST /sessions/{id}/turn — assert TurnResult shape.""" + captured: dict[str, Any] = {} + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps(_create_payload())) + + def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["body"] = json.loads(request.body) + return ( + 200, + {}, + json.dumps( + _turn_payload( + turn_index=2, + events=[ + {"type": "thinking", "content": "..."}, + {"type": "tool_call", "name": "Read", "args": {}}, + {"type": "text", "content": "hello"}, + ], + duration_ms=4321, + ) + ), + ) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb + ) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + with _forge() as f, f.session(agent="claude") as s: + r = s.turn("hello", files=["ff_xyz"], timeout_secs=42) + + self.assertIsInstance(r, TurnResult) + self.assertTrue(r.ok) + self.assertEqual(r.session_id, "sess_abc") + self.assertEqual(r.turn_index, 2) + self.assertEqual(r.stop_reason, "end_turn") + self.assertEqual(r.duration_ms, 4321) + self.assertEqual(len(r.events), 3) + self.assertEqual(captured["body"]["prompt"], "hello") + self.assertEqual(captured["body"]["files"], ["ff_xyz"]) + self.assertEqual(captured["body"]["timeout_secs"], 42) + + @responses.activate + def test_session_turn_empty_prompt_rejected_locally(self) -> None: + responses.add( + responses.POST, + f"{BASE_URL}/sessions", + json=_create_payload(), + status=200, + ) + responses.add( + responses.DELETE, + f"{BASE_URL}/sessions/sess_abc", + json={"ok": True}, + status=200, + ) + with _forge() as f, f.session() as s, self.assertRaises(ValueError): + s.turn("") + + +class TestSessionCloseIdempotent(unittest.TestCase): + @responses.activate + def test_session_close_idempotent(self) -> None: + """Calling close() twice — second is a no-op (no second HTTP request).""" + captured: dict[str, Any] = {"close_count": 0} + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps(_create_payload())) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["close_count"] += 1 + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + with _forge() as f: + s = f.create_session(agent="claude") + s.close() + self.assertTrue(s.closed) + s.close() # second call must NOT issue another DELETE + s.close() # ditto + + self.assertEqual(captured["close_count"], 1) + + +class TestSessionCloseOnException(unittest.TestCase): + @responses.activate + def test_session_close_on_exception(self) -> None: + """Exception raised inside `with` block — session still gets closed.""" + captured: dict[str, Any] = {"closed": False} + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps(_create_payload())) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["closed"] = True + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + class Boom(Exception): + pass + + with _forge() as f: + with self.assertRaises(Boom): + with f.session() as s: + self.assertEqual(s.session_id, "sess_abc") + raise Boom("kaboom inside the with") + + self.assertTrue(captured["closed"]) + + +class TestListSessions(unittest.TestCase): + @responses.activate + def test_list_sessions(self) -> None: + rows = [ + { + "session_id": "sess_a", + "app_name": "cauldron", + "agent": "claude", + "cwd": "/tmp/x", + "created_at": 100, + "last_turn_at": 200, + "turn_count": 3, + "closed_at": None, + "meta": None, + }, + { + "session_id": "sess_b", + "app_name": "cauldron", + "agent": "claude", + "cwd": "/tmp/y", + "created_at": 50, + "last_turn_at": None, + "turn_count": 0, + "closed_at": 75, + "meta": {"x": 1}, + }, + ] + responses.add( + responses.GET, + f"{BASE_URL}/sessions", + json={"ok": True, "sessions": rows, "count": len(rows)}, + status=200, + ) + + with _forge() as f: + out = f.list_sessions() + + self.assertEqual(len(out), 2) + self.assertEqual(out[0]["session_id"], "sess_a") + self.assertIsNone(out[0]["closed_at"]) + self.assertEqual(out[1]["closed_at"], 75) + + @responses.activate + def test_list_sessions_include_closed_false(self) -> None: + captured: dict[str, Any] = {} + + def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + captured["url"] = request.url + return (200, {}, json.dumps({"ok": True, "sessions": [], "count": 0})) + + responses.add_callback(responses.GET, f"{BASE_URL}/sessions", callback=cb) + with _forge() as f: + f.list_sessions(include_closed=False) + self.assertIn("include_closed=false", captured["url"]) + + +class TestGetSessionState(unittest.TestCase): + @responses.activate + def test_get_session_state(self) -> None: + responses.add( + responses.GET, + f"{BASE_URL}/sessions/sess_abc", + json={ + "ok": True, + "session_id": "sess_abc", + "agent": "claude", + "cwd": "/tmp/x", + "created_at": 100, + "last_turn_at": 200, + "turn_count": 5, + "closed_at": None, + "live": True, + "meta": {"caller": "test"}, + }, + status=200, + ) + with _forge() as f: + state = f.get_session("sess_abc") + self.assertEqual(state["session_id"], "sess_abc") + self.assertEqual(state["turn_count"], 5) + self.assertIsNone(state["closed_at"]) + self.assertTrue(state["live"]) + + def test_get_session_empty_id_rejected(self) -> None: + with _forge() as f, self.assertRaises(ValueError): + f.get_session("") + + @responses.activate + def test_session_state_method_round_trips(self) -> None: + """Session.state() calls through to forge.get_session(self.session_id).""" + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps(_create_payload())) + + def state_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return ( + 200, + {}, + json.dumps( + { + "ok": True, + "session_id": "sess_abc", + "agent": "claude", + "cwd": "/tmp/x", + "created_at": 100, + "last_turn_at": None, + "turn_count": 0, + "closed_at": None, + "live": True, + "meta": None, + } + ), + ) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.GET, f"{BASE_URL}/sessions/sess_abc", callback=state_cb + ) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + with _forge() as f, f.session() as s: + st = s.state() + self.assertEqual(st["session_id"], "sess_abc") + + +class TestSession404IsApiError(unittest.TestCase): + @responses.activate + def test_session_404_is_api_error(self) -> None: + """Cross-token session access — server returns 404, SDK surfaces ForgeAPIError(404).""" + responses.add( + responses.GET, + f"{BASE_URL}/sessions/sess_other_token", + json={"detail": "session not found"}, + status=404, + ) + with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: + f.get_session("sess_other_token") + self.assertEqual(ctx.exception.status_code, 404) + + @responses.activate + def test_session_turn_404_is_api_error(self) -> None: + """Same on /sessions/{id}/turn — 404 surfaces as ForgeAPIError(404).""" + + def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps(_create_payload())) + + def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (404, {}, json.dumps({"detail": "session not found"})) + + def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: + return (200, {}, json.dumps({"ok": True})) + + responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) + responses.add_callback( + responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb + ) + responses.add_callback( + responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb + ) + + with _forge() as f, f.session() as s, self.assertRaises(ForgeAPIError) as ctx: + s.turn("hello") + self.assertEqual(ctx.exception.status_code, 404) + + +class TestTurnResultText(unittest.TestCase): + def test_turn_result_text_concatenates_text_events(self) -> None: + """`.text()` concatenates `type=='text'` events' content; ignores others.""" + r = TurnResult.from_response( + { + "ok": True, + "session_id": "sess_x", + "turn_index": 1, + "events": [ + {"type": "thinking", "content": "should not appear"}, + {"type": "text", "content": "hello "}, + {"type": "tool_call", "name": "Read", "args": {}, "result": {}}, + {"type": "text", "content": "world"}, + # malformed event must not blow up text() + {"type": "text"}, # no content + "not a dict", # type: ignore[list-item] + ], + "stop_reason": "end_turn", + "duration_ms": 1, + } + ) + self.assertEqual(r.text(), "hello world") + + def test_turn_result_text_empty_when_no_text_events(self) -> None: + r = TurnResult.from_response( + { + "ok": True, + "session_id": "sess_x", + "turn_index": 1, + "events": [{"type": "tool_call", "name": "X"}], + "stop_reason": "end_turn", + "duration_ms": 1, + } + ) + self.assertEqual(r.text(), "") + + +class TestV01RunUnchanged(unittest.TestCase): + """Regression: existing /run path still works unchanged after v0.2 layer.""" + + @responses.activate + def test_v01_run_unchanged(self) -> None: + responses.add( + responses.POST, + f"{BASE_URL}/run", + json={ + "ok": True, + "result": {"hello": "world"}, + "duration_ms": 100, + "stop_reason": "end_turn", + }, + status=200, + ) + with _forge() as f: + r = f.run(prompt='Reply with JSON: {"hello": "world"}') + self.assertTrue(r.ok) + self.assertEqual(r.result, {"hello": "world"}) + self.assertEqual(r.duration_ms, 100) + + +if __name__ == "__main__": + unittest.main()