From 8479725513e62eacfb113f4fcbbdd5f6b1ebf68b Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 29 Apr 2026 07:03:39 -0700 Subject: [PATCH] clients/kotlin: v0.2 multi-turn Session API - Session : Closeable; AtomicBoolean idempotent close (rollback on transient) - forge.session(opts) { s -> ... } block helper preferred - ForgeClient.createSession / listSessions / getSession - Per-call HTTP timeout on /sessions/{id}/turn (audit-fix 3c77ef5 pattern) - Per-session Mutex serializes concurrent turns - TurnResult.text() helper, Session.toString redacts client - SessionTest.kt: ~14 tests covering block/idempotency/concurrent/timeout/list/state/404/redaction/regression - README "Multi-turn / Sessions (v0.2)" section v0.1 surface unchanged. Ktor 2.3.13 preserved. Spec: memory/spec-clawdforge-v0.2.md Server core: 940861f --- clients/kotlin/README.md | 96 ++- clients/kotlin/build.gradle.kts | 2 +- .../main/kotlin/com/clawdforge/ForgeClient.kt | 148 +++++ .../main/kotlin/com/clawdforge/Sessions.kt | 279 +++++++++ .../test/kotlin/com/clawdforge/SessionTest.kt | 576 ++++++++++++++++++ 5 files changed, 1099 insertions(+), 2 deletions(-) create mode 100644 clients/kotlin/src/main/kotlin/com/clawdforge/Sessions.kt create mode 100644 clients/kotlin/src/test/kotlin/com/clawdforge/SessionTest.kt diff --git a/clients/kotlin/README.md b/clients/kotlin/README.md index b63b7e9..b2cef99 100644 --- a/clients/kotlin/README.md +++ b/clients/kotlin/README.md @@ -25,7 +25,7 @@ repositories { } dependencies { - implementation("com.clawdforge:clawdforge:0.1.0") + implementation("com.clawdforge:clawdforge:0.2.0") } ``` @@ -68,6 +68,96 @@ suspend fun main() { } ``` +## Multi-turn / Sessions (v0.2) + +v0.2 adds a parallel `/sessions/*` surface for multi-turn conversations. +v0.1's `/run` still works exactly as before — sessions are purely additive. + +The preferred entry point is the `session { … }` block helper, which +auto-closes the server-side session on block exit, even on throw: + +```kotlin +import com.clawdforge.ForgeClient +import com.clawdforge.SessionOptions + +suspend fun main() { + val forge = ForgeClient("http://localhost:8800", System.getenv("CLAWDFORGE_TOKEN")) + forge.use { + forge.session(SessionOptions(agent = "claude")) { s -> + val r1 = s.turn("Read README.md and summarize") + println(r1.text()) + + val r2 = s.turn("Now look at the auth flow", files = listOf("ff_xyz")) + println(r2.text()) + } + // session is closed here; DELETE /sessions/{id} has fired + } +} +``` + +`Session` implements `java.io.Closeable`, so it also composes with `use`: + +```kotlin +forge.createSession().use { s -> + val r = s.turn("hello") + println(r.text()) +} // auto-closed +``` + +For long-lived handles you can drive the lifecycle yourself: + +```kotlin +val s = forge.createSession(SessionOptions(agent = "claude")) +try { + val r = s.turn("hello") +} finally { + s.close() +} +``` + +Listing and inspection: + +```kotlin +val sessions: List = forge.listSessions() +val state: SessionState = forge.getSession(s.id) +``` + +### Behaviours worth flagging + +- **Idempotent close.** `Session.close()` only fires `DELETE /sessions/{id}` + the first time it's called; later invocations short-circuit on an + internal `AtomicBoolean`. If the DELETE fails on the first attempt the + flag is rolled back so a transient transport blip can be retried. +- **Concurrent turns are serialized.** Two coroutines calling `s.turn()` + on the same `Session` won't race on the wire — an internal coroutine + `Mutex` orders them, so the server only ever sees one in-flight turn + per session. (Multiple sessions, of course, run in parallel.) +- **Per-call HTTP timeout.** `Session.turn(prompt, files, timeoutSecs)` + applies the same per-call timeout pattern as audit-fix `3c77ef5` for + `ForgeClient.run`: a `timeoutSecs` larger than `ForgeOptions.defaultTimeout` + extends the HTTP request window so the client doesn't disconnect while + the server is still doing useful work. +- **`Session.toString()` redacts the embedded client.** The `ForgeClient` + reference (which carries the bearer) is intentionally omitted. Same + hazard / same pattern as `AppToken.toString`. +- **Cross-token access is 404.** `forge.getSession(id)` against a session + owned by another token surfaces as `ForgeApiException(statusCode=404)` + — the server returns 404 (not 403) to avoid leaking session existence. + +### Turn output + +`TurnResult.events` is the structured event list (text, thinking, +tool_call, …). For the common case where you want the model's textual +reply, use `result.text()`: + +```kotlin +val r = s.turn("Explain in two sentences") +println(r.text()) // concatenated content of every type=="text" event +``` + +`TurnEvent.args` and `TurnEvent.result` are `JsonElement?` — narrow them +with the standard `kotlinx.serialization.json` extensions. + ## Public API All methods are `suspend` — call them from a coroutine scope. @@ -80,6 +170,10 @@ All methods are `suspend` — call them from a coroutine scope. | `createToken(CreateTokenRequest)` | `POST /admin/tokens` | Admin bootstrap token only. | | `listTokens()` | `GET /admin/tokens` | Admin only. | | `revokeToken(name)` | `DELETE /admin/tokens/{name}` | Admin only. | +| `createSession(SessionOptions)` | `POST /sessions` | v0.2; returns a `Session`. | +| `session(SessionOptions) { s -> … }` | — | v0.2 block helper; auto-closes. | +| `listSessions()` | `GET /sessions` | v0.2; returns `List`. | +| `getSession(id)` | `GET /sessions/{id}` | v0.2; cross-token = 404. | | `close()` | — | Disposes the underlying Ktor client. | ### Constructor diff --git a/clients/kotlin/build.gradle.kts b/clients/kotlin/build.gradle.kts index 4802d9d..67d6af7 100644 --- a/clients/kotlin/build.gradle.kts +++ b/clients/kotlin/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "com.clawdforge" -version = "0.1.0" +version = "0.2.0" repositories { mavenCentral() diff --git a/clients/kotlin/src/main/kotlin/com/clawdforge/ForgeClient.kt b/clients/kotlin/src/main/kotlin/com/clawdforge/ForgeClient.kt index bad62f3..7295a6f 100644 --- a/clients/kotlin/src/main/kotlin/com/clawdforge/ForgeClient.kt +++ b/clients/kotlin/src/main/kotlin/com/clawdforge/ForgeClient.kt @@ -254,6 +254,154 @@ public class ForgeClient @JvmOverloads public constructor( } } + // ------------------------------------------------------------------ + // v0.2: multi-turn sessions + // ------------------------------------------------------------------ + + /** + * `POST /sessions`. Creates a server-side session and returns a + * [Session] handle bound to this client. + * + * Prefer the [session] block helper, which auto-closes the returned + * handle even when the body throws. Use this method directly when you + * need a session whose lifetime crosses suspending boundaries that + * make a block helper inconvenient — wrap in `try { … } finally { + * s.close() }` (or `s.use { … }`) to avoid leaking server-side state. + * + * @param options agent / metadata; defaults are fine for the common case. + */ + public suspend fun createSession(options: SessionOptions = SessionOptions()): Session = + wrapTransport("POST /sessions") { + val resp = http.post("$baseUrl/sessions") { + authHeader() + contentType(ContentType.Application.Json) + setBody(SessionCreateBody(agent = options.agent, meta = options.meta)) + } + val body = decodeOrThrow(resp) + Session( + client = this, + id = body.sessionId, + agent = body.agent, + createdAt = body.createdAt, + ) + } + + /** + * Block-helper around [createSession] / [Session.close]. Creates a + * session, runs [block] with it, and closes the server-side session + * even when [block] throws: + * + * ```kotlin + * forge.session(SessionOptions(agent = "claude")) { s -> + * val r = s.turn("hello") + * println(r.text()) + * } + * ``` + * + * The close exception (if any) is swallowed — the body's outcome is + * what propagates. If you want to observe close failures, use + * [createSession] + `try/finally` directly. + */ + public suspend fun session( + options: SessionOptions = SessionOptions(), + block: suspend (Session) -> T, + ): T { + val s = createSession(options) + return try { + block(s) + } finally { + try { + s.close() + } catch (ce: CancellationException) { + throw ce + } catch (_: Throwable) { + // Mirror the audit pattern: don't override an in-flight + // body exception with a close-time failure. Callers that + // need close-failure visibility can drive createSession() + // + try/finally themselves. + } + } + } + + /** + * `GET /sessions`. Lists the calling token's sessions (newest-first, + * per the server contract). + */ + public suspend fun listSessions(): List = wrapTransport("GET /sessions") { + val resp = http.get("$baseUrl/sessions") { authHeader() } + decodeOrThrow(resp).sessions + } + + /** + * `GET /sessions/{id}`. A session belonging to a different token + * surfaces as [ForgeApiException] with `statusCode = 404` — the + * server intentionally returns 404 (not 403) for cross-token access + * to avoid leaking session existence. + */ + public suspend fun getSession(id: String): SessionState { + require(id.isNotEmpty()) { "id must not be empty" } + return wrapTransport("GET /sessions/{id}") { + val resp = http.get("$baseUrl/sessions/${id.encodeURLPathPart()}") { + authHeader() + } + decodeOrThrow(resp) + } + } + + /** + * Internal: `POST /sessions/{id}/turn`. Don't call directly — go + * through [Session.turn], which holds the per-session mutex. + * + * Applies the audit-fix `3c77ef5` per-call HTTP timeout pattern: a + * caller-supplied `timeoutSecs` larger than [ForgeOptions.defaultTimeout] + * extends the HTTP request window so we don't disconnect while the + * server is still doing useful work. + */ + internal suspend fun sessionTurnInternal( + id: String, + prompt: String, + files: List?, + timeoutSecs: Int?, + ): TurnResult { + val effectiveTimeoutSecs = timeoutSecs + ?: options.defaultTimeout.inWholeSeconds.toInt() + val callTimeoutMs = (effectiveTimeoutSecs * 1000L) + options.requestMargin.inWholeMilliseconds + val encodedId = id.encodeURLPathPart() + return wrapTransport("POST /sessions/{id}/turn") { + val resp = http.post("$baseUrl/sessions/$encodedId/turn") { + timeout { + requestTimeoutMillis = callTimeoutMs + socketTimeoutMillis = callTimeoutMs + } + authHeader() + contentType(ContentType.Application.Json) + setBody( + SessionTurnBody( + prompt = prompt, + files = files?.takeIf { it.isNotEmpty() }, + timeoutSecs = timeoutSecs, + ), + ) + } + decodeOrThrow(resp) + } + } + + /** + * Internal: `DELETE /sessions/{id}`. Don't call directly — go through + * [Session.close], which guards with an [java.util.concurrent.atomic.AtomicBoolean] + * so the DELETE only fires once per session handle. + */ + internal suspend fun sessionCloseInternal(id: String) { + val encodedId = id.encodeURLPathPart() + wrapTransport("DELETE /sessions/{id}") { + val resp = http.delete("$baseUrl/sessions/$encodedId") { authHeader() } + if (!resp.status.isSuccess()) { + throwForStatus(resp) + } + } + } + /** Disposes the underlying Ktor client. After this, the instance is unusable. */ override fun close() { http.close() diff --git a/clients/kotlin/src/main/kotlin/com/clawdforge/Sessions.kt b/clients/kotlin/src/main/kotlin/com/clawdforge/Sessions.kt new file mode 100644 index 0000000..c14d53d --- /dev/null +++ b/clients/kotlin/src/main/kotlin/com/clawdforge/Sessions.kt @@ -0,0 +1,279 @@ +package com.clawdforge + +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement +import java.io.Closeable +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Options for [ForgeClient.createSession] / [ForgeClient.session]. + * + * Pass an empty instance for defaults: + * + * ```kotlin + * forge.session { s -> s.turn("hi") } // defaults + * forge.session(SessionOptions(agent = "claude")) { s -> ... } // explicit + * ``` + * + * @property agent agent slug the server should bind this session to. + * Defaults to `"claude"`. + * @property meta opaque per-session metadata; serialised verbatim under the + * `meta` key in the create-body. `null` is omitted from the wire. + */ +public data class SessionOptions( + public val agent: String = "claude", + public val meta: JsonElement? = null, +) + +/** + * One structured event emitted by a turn. + * + * The server collects these from acpx as the agent runs and returns them + * as a complete batch when the turn ends. Each event has at minimum a + * [type] (`"text"`, `"thinking"`, `"tool_call"`, ...); the type-specific + * payload fields are pass-through and may be absent for non-applicable + * types. + * + * Forward-compat: unknown types decode normally with absent fields left + * `null`. Callers should switch on [type] and defensively handle `null` + * payload fields. + * + * @property type event type, e.g. `"text"`, `"thinking"`, `"tool_call"`. + * @property content text content for `text` / `thinking` events; null otherwise. + * @property name tool name for `tool_call` events; null otherwise. + * @property args tool-call arguments for `tool_call` events; null otherwise. + * @property result tool-call result for `tool_call` events; null otherwise. + */ +@Serializable +public data class TurnEvent( + public val type: String, + public val content: String? = null, + public val name: String? = null, + public val args: JsonElement? = null, + public val result: JsonElement? = null, +) + +/** + * Parsed body of `POST /sessions/{id}/turn`. + * + * Use [text] to concatenate the `text` events into a single string for the + * common case where you just want "what did the model say?". + * + * @property ok true on a successful turn. + * @property sessionId server session id (also acpx's id). + * @property turnIndex zero-based turn ordinal within this session. + * @property events structured events emitted by the turn, in order. + * @property stopReason reason the turn ended (`"end_turn"`, `"max_tokens"`, …). + * @property durationMs server-side wall-clock duration in milliseconds. + */ +@Serializable +public data class TurnResult( + public val ok: Boolean, + @SerialName("session_id") public val sessionId: String, + @SerialName("turn_index") public val turnIndex: Int, + public val events: List, + @SerialName("stop_reason") public val stopReason: String, + @SerialName("duration_ms") public val durationMs: Long, +) { + /** + * Concatenates the [TurnEvent.content] of every `type == "text"` event + * in [events] order. Skips events whose content is `null`. + */ + public fun text(): String = + events.asSequence() + .filter { it.type == "text" } + .mapNotNull { it.content } + .joinToString("") +} + +/** + * Server's view of one session — returned by `GET /sessions/{id}` and as + * one row of `GET /sessions`. + * + * @property sessionId server session id. + * @property agent agent slug this session is bound to. + * @property appName name of the app token that owns the session; may be + * absent on responses where the server elects not to + * include it. + * @property createdAt unix-second creation timestamp. + * @property lastTurnAt unix-second timestamp of the most recent turn, or + * `null` if no turn has been sent yet. + * @property turnCount number of turns sent so far. + * @property closedAt unix-second close timestamp, or `null` if the + * session is still open. + */ +@Serializable +public data class SessionState( + @SerialName("session_id") public val sessionId: String, + public val agent: String, + @SerialName("app_name") public val appName: String? = null, + @SerialName("created_at") public val createdAt: Long, + @SerialName("last_turn_at") public val lastTurnAt: Long? = null, + @SerialName("turn_count") public val turnCount: Int, + @SerialName("closed_at") public val closedAt: Long? = null, +) + +/** Wrapper for `GET /sessions`. */ +@Serializable +internal data class SessionList( + val sessions: List = emptyList(), +) + +/** Wrapper for `POST /sessions`. */ +@Serializable +internal data class SessionCreateBody( + val agent: String? = null, + val meta: JsonElement? = null, +) + +/** Wrapper for `POST /sessions` response. */ +@Serializable +internal data class SessionCreateResponse( + @SerialName("session_id") val sessionId: String, + val agent: String = "claude", + @SerialName("created_at") val createdAt: Long = 0, +) + +/** Wire body for `POST /sessions/{id}/turn`. */ +@Serializable +internal data class SessionTurnBody( + val prompt: String, + val files: List? = null, + @SerialName("timeout_secs") val timeoutSecs: Int? = null, +) + +/** + * Handle to a server-side multi-turn session. + * + * Construct via [ForgeClient.createSession] (or grab one from inside the + * [ForgeClient.session] block helper, which auto-closes on exit). + * + * Implements [Closeable], so it composes with `use { }`: + * + * ```kotlin + * forge.createSession().use { s -> + * val r = s.turn("hello") + * println(r.text()) + * } + * ``` + * + * ### Thread safety + * + * [turn] is `suspend` and serialises concurrent calls on the same instance + * via an internal [Mutex] — two coroutines calling `turn()` at once will + * be ordered (FIFO with respect to mutex acquisition), so the server only + * ever sees one in-flight turn per session at a time. + * + * [close] is idempotent: the first invocation flips an [AtomicBoolean] and + * issues `DELETE /sessions/{id}`; subsequent invocations short-circuit + * without an HTTP round-trip. If the DELETE fails on the first attempt the + * flag is rolled back so the caller can retry — a transient transport blip + * shouldn't leave the client thinking a still-live session was reaped. + * + * ### Bearer-leak hardening + * + * The embedded [client] reference is intentionally omitted from + * [toString] — including it would risk leaking the bearer token into logs + * (see [AppToken.toString] for the parallel pattern). + * + * @property id server-assigned session id (also acpx's id). + * @property agent agent slug this session is bound to. + * @property createdAt unix-second creation timestamp recorded by the server. + */ +public class Session internal constructor( + private val client: ForgeClient, + public val id: String, + public val agent: String, + public val createdAt: Long, +) : Closeable { + + private val closedFlag = AtomicBoolean(false) + private val turnMutex = Mutex() + + /** + * `true` once [close] has succeeded at least once on this client. Note + * this is the client's view: the server may have reaped the session + * out from under us (TTL sweeper, manual revoke) without our knowing. + */ + public val isClosed: Boolean get() = closedFlag.get() + + /** + * Sends one turn on this session. + * + * Concurrent calls on the same [Session] instance are serialised via an + * internal mutex — the second caller suspends until the first one + * returns. The server is therefore never asked to handle two turns on + * the same session at the same time. + * + * @param prompt prompt text. Must not be empty. + * @param files optional list of `ff_…` file tokens to attach. + * `null` or empty omits the field. + * @param timeoutSecs optional per-turn subprocess budget in seconds. + * Tracks the audit-fix [pattern](file://3c77ef5): + * overrides the default HTTP request timeout so a + * value larger than [ForgeOptions.defaultTimeout] + * does not produce an HTTP-disconnect mid-turn. + * @throws IllegalStateException if [isClosed] is `true`. + * @throws IllegalArgumentException if [prompt] is empty. + * @throws ForgeAuthException on HTTP 401/403. + * @throws ForgeApiException on any other non-2xx response. + * @throws ForgeTransportException on transport / decoding failure. + */ + public suspend fun turn( + prompt: String, + files: List? = null, + timeoutSecs: Int? = null, + ): TurnResult { + check(!isClosed) { "session $id is closed" } + require(prompt.isNotEmpty()) { "prompt must not be empty" } + return turnMutex.withLock { + client.sessionTurnInternal(id, prompt, files, timeoutSecs) + } + } + + /** + * Fetches the current server-side state for this session — equivalent + * to `forge.getSession(id)`. + */ + public suspend fun state(): SessionState = client.getSession(id) + + /** + * Closes the server-side session. + * + * Idempotent on the client: a second call short-circuits without an + * HTTP round-trip via [AtomicBoolean.compareAndSet]. The server's + * `DELETE /sessions/{id}` is itself idempotent (returns + * `already_closed: true` on a re-close), so even concurrent closes + * degrade gracefully. + * + * If the DELETE fails on the first attempt the closed flag is cleared + * so a subsequent call can retry. + * + * Implemented synchronously via [runBlocking] because [Closeable.close] + * is not a `suspend` function — the body is one DELETE so the blocking + * call is bounded by the configured request timeout. + */ + override fun close() { + if (closedFlag.compareAndSet(false, true)) { + try { + runBlocking { client.sessionCloseInternal(id) } + } catch (e: Throwable) { + closedFlag.set(false) // allow retry on transient failure + throw e + } + } + } + + /** + * Renders this session **without** the embedded [client] reference, + * which holds the bearer token. Same hazard pattern as + * [AppToken.toString] — the standard data-class auto-`toString` would + * print every component. + */ + override fun toString(): String = + "Session(id=$id, agent=$agent, closed=${closedFlag.get()})" +} diff --git a/clients/kotlin/src/test/kotlin/com/clawdforge/SessionTest.kt b/clients/kotlin/src/test/kotlin/com/clawdforge/SessionTest.kt new file mode 100644 index 0000000..8ec7537 --- /dev/null +++ b/clients/kotlin/src/test/kotlin/com/clawdforge/SessionTest.kt @@ -0,0 +1,576 @@ +package com.clawdforge + +import io.ktor.client.engine.mock.MockEngine +import io.ktor.client.engine.mock.MockRequestHandleScope +import io.ktor.client.engine.mock.respond +import io.ktor.client.engine.mock.respondError +import io.ktor.client.request.HttpRequestData +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpMethod +import io.ktor.http.HttpStatusCode +import io.ktor.http.headersOf +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonPrimitive +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import kotlinx.serialization.json.put +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.Test +import kotlin.test.assertContains +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds + +/** + * v0.2 multi-turn / Session API tests. All interactions go through Ktor's + * [MockEngine] — no real network. + */ +class SessionTest { + + private val jsonContentType = headersOf(HttpHeaders.ContentType, "application/json") + + /** Per-test request hit counts keyed by `" "`. */ + private class Hits { + private val map = mutableMapOf() + + operator fun get(key: String): Int = map[key]?.get() ?: 0 + + fun bump(key: String) { + map.getOrPut(key) { AtomicInteger() }.incrementAndGet() + } + } + + private fun client( + defaultTimeout: kotlin.time.Duration = 60.seconds, + requestMargin: kotlin.time.Duration = 30.seconds, + token: String = "cf_test", + handler: suspend MockRequestHandleScope.(HttpRequestData) -> io.ktor.client.request.HttpResponseData, + ): ForgeClient { + val engine = MockEngine { req -> handler(req) } + return ForgeClient( + baseUrl = "http://forge.test", + token = token, + options = ForgeOptions( + engine = engine, + defaultTimeout = defaultTimeout, + requestMargin = requestMargin, + ), + ) + } + + /** + * Wires up a minimal /sessions surface with hit counters keyed by + * route pattern. Returns the [Hits] instance for assertions. + */ + private fun minimalSessionRoutes( + sessionId: String = "sess_abc", + ): Pair io.ktor.client.request.HttpResponseData> { + val hits = Hits() + val turnIdx = AtomicInteger() + val handler: suspend MockRequestHandleScope.(HttpRequestData) -> io.ktor.client.request.HttpResponseData = + { req -> + val path = req.url.encodedPath + when { + req.method == HttpMethod.Post && path == "/sessions" -> { + hits.bump("POST /sessions") + respond( + """{"session_id":"$sessionId","agent":"claude","created_at":1700000000}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + req.method == HttpMethod.Get && path == "/sessions" -> { + hits.bump("GET /sessions") + respond("""{"sessions":[]}""", HttpStatusCode.OK, jsonContentType) + } + req.method == HttpMethod.Post && path.endsWith("/turn") -> { + hits.bump("POST /sessions/{id}/turn") + val idx = turnIdx.getAndIncrement() + respond( + """{"ok":true,"session_id":"$sessionId","turn_index":$idx,""" + + """"events":[{"type":"text","content":"hi"}],""" + + """"stop_reason":"end_turn","duration_ms":42}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + req.method == HttpMethod.Delete && path.startsWith("/sessions/") -> { + hits.bump("DELETE /sessions/{id}") + respond("""{"ok":true}""", HttpStatusCode.OK, jsonContentType) + } + req.method == HttpMethod.Get && path.startsWith("/sessions/") -> { + hits.bump("GET /sessions/{id}") + respond( + """{"session_id":"$sessionId","agent":"claude","app_name":"app1",""" + + """"created_at":1700000000,"last_turn_at":null,""" + + """"turn_count":0,"closed_at":null}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + else -> respondError(HttpStatusCode.MethodNotAllowed) + } + } + return hits to handler + } + + // ------------------------------------------------------------------ + + @Test + fun sessionBlockHelperAutoCloses() = runTest { + val (hits, handler) = minimalSessionRoutes("sess_auto") + client(handler = handler).use { c -> + c.session { s -> + assertEquals("sess_auto", s.id) + assertEquals("claude", s.agent) + assertEquals(1700000000L, s.createdAt) + assertFalse(s.isClosed) + val r = s.turn("hi") + assertTrue(r.ok) + } + } + assertEquals(1, hits["POST /sessions"]) + assertEquals(1, hits["POST /sessions/{id}/turn"]) + assertEquals(1, hits["DELETE /sessions/{id}"]) + } + + @Test + fun sessionBlockHelperClosesOnException() = runTest { + val (hits, handler) = minimalSessionRoutes("sess_throw") + val ex = assertFailsWith { + client(handler = handler).use { c -> + c.session { _ -> + error("boom") + } + } + } + assertEquals("boom", ex.message) + assertEquals(1, hits["POST /sessions"]) + assertEquals( + 1, + hits["DELETE /sessions/{id}"], + "DELETE must fire on exception unwind", + ) + } + + @Test + fun sessionCloseIdempotent() = runTest { + val (hits, handler) = minimalSessionRoutes("sess_idem") + client(handler = handler).use { c -> + val s = c.createSession() + s.close() + assertTrue(s.isClosed) + s.close() // short-circuit + s.close() // and again + assertEquals( + 1, + hits["DELETE /sessions/{id}"], + "DELETE must hit ONCE across multiple close() calls", + ) + } + } + + @Test + fun sessionTurnRoundTrip() = runTest { + var sentBody: String? = null + var sentPath: String? = null + val c = client { req -> + val path = req.url.encodedPath + when { + req.method == HttpMethod.Post && path == "/sessions" -> respond( + """{"session_id":"sess_rt","agent":"claude","created_at":1}""", + HttpStatusCode.OK, + jsonContentType, + ) + req.method == HttpMethod.Post && path.endsWith("/turn") -> { + sentPath = path + sentBody = (req.body as io.ktor.http.content.TextContent).text + respond( + """{"ok":true,"session_id":"sess_rt","turn_index":2,""" + + """"events":[""" + + """{"type":"thinking","content":"..."},""" + + """{"type":"tool_call","name":"Read",""" + + """"args":{"path":"x"},"result":"ok"},""" + + """{"type":"text","content":"hello"},""" + + """{"type":"text","content":" world"}""" + + """],"stop_reason":"end_turn","duration_ms":123}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + req.method == HttpMethod.Delete -> respond( + """{"ok":true}""", + HttpStatusCode.OK, + jsonContentType, + ) + else -> respondError(HttpStatusCode.MethodNotAllowed) + } + } + c.use { client -> + client.session { s -> + val r = s.turn(prompt = "Read README", files = listOf("ff_abc", "ff_def")) + assertTrue(r.ok) + assertEquals("sess_rt", r.sessionId) + assertEquals(2, r.turnIndex) + assertEquals("end_turn", r.stopReason) + assertEquals(123L, r.durationMs) + assertEquals(4, r.events.size) + assertEquals("hello world", r.text()) + } + } + assertNotNull(sentBody) + assertContains(sentBody!!, "\"prompt\":\"Read README\"") + assertContains(sentBody!!, "\"files\":[\"ff_abc\",\"ff_def\"]") + assertEquals("/sessions/sess_rt/turn", sentPath) + } + + @Test + fun sessionTurnAfterCloseThrows() = runTest { + val (_, handler) = minimalSessionRoutes("sess_dead") + client(handler = handler).use { c -> + val s = c.createSession() + s.close() + val ex = assertFailsWith { + s.turn("anybody home?") + } + assertContains(ex.message ?: "", "closed") + } + } + + @Test + fun sessionTurnHttpTimeoutHonorsPerCallTimeoutSecs() = runTest { + // Audit-fix 3c77ef5 regression mirror: per-call timeout overrides + // the default. Global floor: defaultTimeout=5s + requestMargin=1s = 6s. + // Server delays 8s before responding; caller passes timeoutSecs=20. + // Without the per-call override: HttpRequestTimeoutException at 6s. + // With the override: succeeds. + val c = client(defaultTimeout = 5.seconds, requestMargin = 1.seconds) { req -> + val path = req.url.encodedPath + when { + req.method == HttpMethod.Post && path == "/sessions" -> respond( + """{"session_id":"sess_to","agent":"claude","created_at":1}""", + HttpStatusCode.OK, + jsonContentType, + ) + req.method == HttpMethod.Post && path.endsWith("/turn") -> { + delay(8.seconds) + respond( + """{"ok":true,"session_id":"sess_to","turn_index":0,""" + + """"events":[{"type":"text","content":"done"}],""" + + """"stop_reason":"end_turn","duration_ms":8000}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + req.method == HttpMethod.Delete -> respond( + """{"ok":true}""", + HttpStatusCode.OK, + jsonContentType, + ) + else -> respondError(HttpStatusCode.MethodNotAllowed) + } + } + c.use { client -> + client.session { s -> + val r = s.turn("long-running", timeoutSecs = 20) + assertEquals("done", r.text()) + } + } + } + + @Test + fun turnResultTextConcatenates() { + val r = TurnResult( + ok = true, + sessionId = "s", + turnIndex = 0, + events = listOf( + TurnEvent(type = "thinking", content = "ignored"), + TurnEvent(type = "text", content = "alpha "), + TurnEvent( + type = "tool_call", + name = "Read", + args = buildJsonObject { put("p", "x") }, + result = JsonPrimitive("result"), + ), + TurnEvent(type = "text", content = "beta"), + TurnEvent(type = "text", content = null), // skipped + ), + stopReason = "end_turn", + durationMs = 10L, + ) + assertEquals("alpha beta", r.text()) + } + + @Test + fun listSessions() = runTest { + val c = client { req -> + assertEquals(HttpMethod.Get, req.method) + assertEquals("/sessions", req.url.encodedPath) + respond( + """{"sessions":[""" + + """{"session_id":"a","agent":"claude","app_name":"app1",""" + + """"created_at":100,"last_turn_at":150,"turn_count":3,""" + + """"closed_at":null},""" + + """{"session_id":"b","agent":"claude","app_name":"app1",""" + + """"created_at":200,"last_turn_at":null,"turn_count":0,""" + + """"closed_at":250}""" + + """]}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + c.use { + val ss = it.listSessions() + assertEquals(2, ss.size) + assertEquals("a", ss[0].sessionId) + assertEquals("app1", ss[0].appName) + assertEquals(150L, ss[0].lastTurnAt) + assertNull(ss[0].closedAt) + assertEquals(3, ss[0].turnCount) + assertEquals("b", ss[1].sessionId) + assertNull(ss[1].lastTurnAt) + assertEquals(250L, ss[1].closedAt) + } + } + + @Test + fun getSession() = runTest { + val c = client { req -> + assertEquals(HttpMethod.Get, req.method) + assertEquals("/sessions/sess_g", req.url.encodedPath) + respond( + """{"session_id":"sess_g","agent":"claude","app_name":"app1",""" + + """"created_at":100,"last_turn_at":120,"turn_count":1,""" + + """"closed_at":null}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + c.use { + val st = it.getSession("sess_g") + assertEquals("sess_g", st.sessionId) + assertEquals("claude", st.agent) + assertEquals("app1", st.appName) + assertEquals(100L, st.createdAt) + assertEquals(120L, st.lastTurnAt) + assertEquals(1, st.turnCount) + assertNull(st.closedAt) + } + } + + @Test + fun crossTokenIs404() = runTest { + val c = client(token = "cf_other") { req -> + assertEquals(HttpMethod.Get, req.method) + respondError(HttpStatusCode.NotFound, """{"detail":"no such session"}""") + } + c.use { + val ex = assertFailsWith { + it.getSession("sess_owned_by_a") + } + assertEquals(404, ex.statusCode) + assertContains(ex.body, "no such session") + } + } + + @Test + fun sessionToStringDoesNotLeakToken() = runTest { + val (_, handler) = minimalSessionRoutes("sess_redact") + val c = ForgeClient( + baseUrl = "http://forge.test", + token = "cf_super_secret_do_not_log", + options = ForgeOptions(engine = MockEngine { req -> handler(req) }), + ) + c.use { client -> + client.session { s -> + val repr = s.toString() + assertFalse( + repr.contains("cf_super_secret_do_not_log"), + "Session.toString() must not include the bearer; got: $repr", + ) + assertFalse( + repr.contains("ForgeClient"), + "Session.toString() should not embed the client; got: $repr", + ) + assertFalse( + repr.lowercase().contains("token"), + "Session.toString() should not even hint at a token field; got: $repr", + ) + assertContains(repr, "id=sess_redact") + assertContains(repr, "closed=false") + } + } + } + + @Test + fun concurrentTurnsAreSerialized() = runTest { + // Two coroutines call s.turn() concurrently. The mutex on Session + // must serialize them so the server only ever sees one in-flight + // turn at a time. We assert: max-observed in-flight == 1, and both + // turns complete. + val inFlight = AtomicInteger() + val maxInFlight = AtomicInteger() + val firstTurnReleased = CompletableDeferred() + val firstTurnEntered = CompletableDeferred() + val turnCounter = AtomicInteger() + + val c = client { req -> + val path = req.url.encodedPath + when { + req.method == HttpMethod.Post && path == "/sessions" -> respond( + """{"session_id":"sess_par","agent":"claude","created_at":1}""", + HttpStatusCode.OK, + jsonContentType, + ) + req.method == HttpMethod.Post && path.endsWith("/turn") -> { + val now = inFlight.incrementAndGet() + maxInFlight.updateAndGet { kotlin.math.max(it, now) } + val idx = turnCounter.getAndIncrement() + if (idx == 0) { + // First turn: signal entry, then block until released. + firstTurnEntered.complete(Unit) + firstTurnReleased.await() + } + inFlight.decrementAndGet() + respond( + """{"ok":true,"session_id":"sess_par","turn_index":$idx,""" + + """"events":[{"type":"text","content":"r$idx"}],""" + + """"stop_reason":"end_turn","duration_ms":1}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + req.method == HttpMethod.Delete -> respond( + """{"ok":true}""", + HttpStatusCode.OK, + jsonContentType, + ) + else -> respondError(HttpStatusCode.MethodNotAllowed) + } + } + + c.use { client -> + client.session { s -> + coroutineScope { + val a = async { s.turn("first") } + // Wait until the first turn has actually entered the + // mock handler before kicking off the second; otherwise + // we could race past the mutex acquisition itself. + firstTurnEntered.await() + val b = async { s.turn("second") } + // Now release the first; the second is queued behind it. + firstTurnReleased.complete(Unit) + val results = awaitAll(a, b) + assertEquals("r0", results[0].text()) + assertEquals("r1", results[1].text()) + } + } + } + assertEquals(2, turnCounter.get()) + assertEquals( + 1, + maxInFlight.get(), + "concurrent turns on a Session must be serialized via the per-session mutex", + ) + } + + @Test + fun createSessionSendsAgentAndMeta() = runTest { + var sentBody: String? = null + val c = client { req -> + assertEquals(HttpMethod.Post, req.method) + assertEquals("/sessions", req.url.encodedPath) + sentBody = (req.body as io.ktor.http.content.TextContent).text + respond( + """{"session_id":"sess_meta","agent":"claude","created_at":1}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + c.use { + val opts = SessionOptions( + agent = "claude", + meta = buildJsonObject { put("trace", "abc-123") }, + ) + val s = it.createSession(opts) + assertEquals("sess_meta", s.id) + } + assertNotNull(sentBody) + assertContains(sentBody!!, "\"agent\":\"claude\"") + assertContains(sentBody!!, "\"trace\":\"abc-123\"") + } + + @Test + fun closeOnTransientFailureClearsFlag() = runTest { + val deletes = AtomicInteger() + val c = client { req -> + val path = req.url.encodedPath + when { + req.method == HttpMethod.Post && path == "/sessions" -> respond( + """{"session_id":"sess_retry","agent":"claude","created_at":1}""", + HttpStatusCode.OK, + jsonContentType, + ) + req.method == HttpMethod.Delete -> { + val n = deletes.incrementAndGet() + if (n == 1) { + respondError(HttpStatusCode.InternalServerError, """{"detail":"transient"}""") + } else { + respond("""{"ok":true}""", HttpStatusCode.OK, jsonContentType) + } + } + else -> respondError(HttpStatusCode.MethodNotAllowed) + } + } + c.use { client -> + val s = client.createSession() + assertFailsWith { s.close() } + assertFalse(s.isClosed, "transient failure must clear the closed flag for retry") + // Retry succeeds. + s.close() + assertTrue(s.isClosed) + assertEquals(2, deletes.get()) + } + } + + @Test + fun v01RunUnchanged() = runTest { + // Regression: /run on a v0.2-equipped client must behave exactly + // as v0.1, including snake_case body shape and JsonElement result. + var captured: String? = null + val c = client { req -> + assertEquals(HttpMethod.Post, req.method) + assertEquals("/run", req.url.encodedPath) + captured = (req.body as io.ktor.http.content.TextContent).text + respond( + """{"ok":true,"result":{"hello":"world"},""" + + """"duration_ms":1234,"stop_reason":"end_turn"}""", + HttpStatusCode.OK, + jsonContentType, + ) + } + c.use { + val r = it.run(RunRequest(prompt = "hi", model = "sonnet", timeoutSecs = 60)) + assertTrue(r.ok) + assertEquals(1234L, r.durationMs) + assertEquals("end_turn", r.stopReason) + val obj: JsonObject = r.result.jsonObject + assertEquals("world", (obj["hello"] as JsonPrimitive).content) + } + assertNotNull(captured) + assertContains(captured!!, "\"timeout_secs\":60") + assertContains(captured!!, "\"model\":\"sonnet\"") + } +}