clients/kotlin: v0.2 multi-turn Session API

- Session : Closeable; AtomicBoolean idempotent close (rollback on transient)
- forge.session(opts) { s -> ... } block helper preferred
- ForgeClient.createSession / listSessions / getSession
- Per-call HTTP timeout on /sessions/{id}/turn (audit-fix 3c77ef5 pattern)
- Per-session Mutex serializes concurrent turns
- TurnResult.text() helper, Session.toString redacts client
- SessionTest.kt: ~14 tests covering block/idempotency/concurrent/timeout/list/state/404/redaction/regression
- README "Multi-turn / Sessions (v0.2)" section

v0.1 surface unchanged. Ktor 2.3.13 preserved.

Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
This commit is contained in:
Kayos 2026-04-29 07:03:39 -07:00
parent 0f091771d3
commit 8479725513
5 changed files with 1099 additions and 2 deletions

View file

@ -25,7 +25,7 @@ repositories {
} }
dependencies { dependencies {
implementation("com.clawdforge:clawdforge:0.1.0") implementation("com.clawdforge:clawdforge:0.2.0")
} }
``` ```
@ -68,6 +68,96 @@ suspend fun main() {
} }
``` ```
## Multi-turn / Sessions (v0.2)
v0.2 adds a parallel `/sessions/*` surface for multi-turn conversations.
v0.1's `/run` still works exactly as before — sessions are purely additive.
The preferred entry point is the `session { … }` block helper, which
auto-closes the server-side session on block exit, even on throw:
```kotlin
import com.clawdforge.ForgeClient
import com.clawdforge.SessionOptions
suspend fun main() {
val forge = ForgeClient("http://localhost:8800", System.getenv("CLAWDFORGE_TOKEN"))
forge.use {
forge.session(SessionOptions(agent = "claude")) { s ->
val r1 = s.turn("Read README.md and summarize")
println(r1.text())
val r2 = s.turn("Now look at the auth flow", files = listOf("ff_xyz"))
println(r2.text())
}
// session is closed here; DELETE /sessions/{id} has fired
}
}
```
`Session` implements `java.io.Closeable`, so it also composes with `use`:
```kotlin
forge.createSession().use { s ->
val r = s.turn("hello")
println(r.text())
} // auto-closed
```
For long-lived handles you can drive the lifecycle yourself:
```kotlin
val s = forge.createSession(SessionOptions(agent = "claude"))
try {
val r = s.turn("hello")
} finally {
s.close()
}
```
Listing and inspection:
```kotlin
val sessions: List<SessionState> = forge.listSessions()
val state: SessionState = forge.getSession(s.id)
```
### Behaviours worth flagging
- **Idempotent close.** `Session.close()` only fires `DELETE /sessions/{id}`
the first time it's called; later invocations short-circuit on an
internal `AtomicBoolean`. If the DELETE fails on the first attempt the
flag is rolled back so a transient transport blip can be retried.
- **Concurrent turns are serialized.** Two coroutines calling `s.turn()`
on the same `Session` won't race on the wire — an internal coroutine
`Mutex` orders them, so the server only ever sees one in-flight turn
per session. (Multiple sessions, of course, run in parallel.)
- **Per-call HTTP timeout.** `Session.turn(prompt, files, timeoutSecs)`
applies the same per-call timeout pattern as audit-fix `3c77ef5` for
`ForgeClient.run`: a `timeoutSecs` larger than `ForgeOptions.defaultTimeout`
extends the HTTP request window so the client doesn't disconnect while
the server is still doing useful work.
- **`Session.toString()` redacts the embedded client.** The `ForgeClient`
reference (which carries the bearer) is intentionally omitted. Same
hazard / same pattern as `AppToken.toString`.
- **Cross-token access is 404.** `forge.getSession(id)` against a session
owned by another token surfaces as `ForgeApiException(statusCode=404)`
— the server returns 404 (not 403) to avoid leaking session existence.
### Turn output
`TurnResult.events` is the structured event list (text, thinking,
tool_call, …). For the common case where you want the model's textual
reply, use `result.text()`:
```kotlin
val r = s.turn("Explain in two sentences")
println(r.text()) // concatenated content of every type=="text" event
```
`TurnEvent.args` and `TurnEvent.result` are `JsonElement?` — narrow them
with the standard `kotlinx.serialization.json` extensions.
## Public API ## Public API
All methods are `suspend` — call them from a coroutine scope. All methods are `suspend` — call them from a coroutine scope.
@ -80,6 +170,10 @@ All methods are `suspend` — call them from a coroutine scope.
| `createToken(CreateTokenRequest)` | `POST /admin/tokens` | Admin bootstrap token only. | | `createToken(CreateTokenRequest)` | `POST /admin/tokens` | Admin bootstrap token only. |
| `listTokens()` | `GET /admin/tokens` | Admin only. | | `listTokens()` | `GET /admin/tokens` | Admin only. |
| `revokeToken(name)` | `DELETE /admin/tokens/{name}` | Admin only. | | `revokeToken(name)` | `DELETE /admin/tokens/{name}` | Admin only. |
| `createSession(SessionOptions)` | `POST /sessions` | v0.2; returns a `Session`. |
| `session(SessionOptions) { s -> … }` | — | v0.2 block helper; auto-closes. |
| `listSessions()` | `GET /sessions` | v0.2; returns `List<SessionState>`. |
| `getSession(id)` | `GET /sessions/{id}` | v0.2; cross-token = 404. |
| `close()` | — | Disposes the underlying Ktor client. | | `close()` | — | Disposes the underlying Ktor client. |
### Constructor ### Constructor

View file

@ -6,7 +6,7 @@ plugins {
} }
group = "com.clawdforge" group = "com.clawdforge"
version = "0.1.0" version = "0.2.0"
repositories { repositories {
mavenCentral() mavenCentral()

View file

@ -254,6 +254,154 @@ public class ForgeClient @JvmOverloads public constructor(
} }
} }
// ------------------------------------------------------------------
// v0.2: multi-turn sessions
// ------------------------------------------------------------------
/**
* `POST /sessions`. Creates a server-side session and returns a
* [Session] handle bound to this client.
*
* Prefer the [session] block helper, which auto-closes the returned
* handle even when the body throws. Use this method directly when you
* need a session whose lifetime crosses suspending boundaries that
* make a block helper inconvenient wrap in `try { } finally {
* s.close() }` (or `s.use { }`) to avoid leaking server-side state.
*
* @param options agent / metadata; defaults are fine for the common case.
*/
public suspend fun createSession(options: SessionOptions = SessionOptions()): Session =
wrapTransport("POST /sessions") {
val resp = http.post("$baseUrl/sessions") {
authHeader()
contentType(ContentType.Application.Json)
setBody(SessionCreateBody(agent = options.agent, meta = options.meta))
}
val body = decodeOrThrow<SessionCreateResponse>(resp)
Session(
client = this,
id = body.sessionId,
agent = body.agent,
createdAt = body.createdAt,
)
}
/**
* Block-helper around [createSession] / [Session.close]. Creates a
* session, runs [block] with it, and closes the server-side session
* even when [block] throws:
*
* ```kotlin
* forge.session(SessionOptions(agent = "claude")) { s ->
* val r = s.turn("hello")
* println(r.text())
* }
* ```
*
* The close exception (if any) is swallowed the body's outcome is
* what propagates. If you want to observe close failures, use
* [createSession] + `try/finally` directly.
*/
public suspend fun <T> session(
options: SessionOptions = SessionOptions(),
block: suspend (Session) -> T,
): T {
val s = createSession(options)
return try {
block(s)
} finally {
try {
s.close()
} catch (ce: CancellationException) {
throw ce
} catch (_: Throwable) {
// Mirror the audit pattern: don't override an in-flight
// body exception with a close-time failure. Callers that
// need close-failure visibility can drive createSession()
// + try/finally themselves.
}
}
}
/**
* `GET /sessions`. Lists the calling token's sessions (newest-first,
* per the server contract).
*/
public suspend fun listSessions(): List<SessionState> = wrapTransport("GET /sessions") {
val resp = http.get("$baseUrl/sessions") { authHeader() }
decodeOrThrow<SessionList>(resp).sessions
}
/**
* `GET /sessions/{id}`. A session belonging to a different token
* surfaces as [ForgeApiException] with `statusCode = 404` the
* server intentionally returns 404 (not 403) for cross-token access
* to avoid leaking session existence.
*/
public suspend fun getSession(id: String): SessionState {
require(id.isNotEmpty()) { "id must not be empty" }
return wrapTransport("GET /sessions/{id}") {
val resp = http.get("$baseUrl/sessions/${id.encodeURLPathPart()}") {
authHeader()
}
decodeOrThrow(resp)
}
}
/**
* Internal: `POST /sessions/{id}/turn`. Don't call directly go
* through [Session.turn], which holds the per-session mutex.
*
* Applies the audit-fix `3c77ef5` per-call HTTP timeout pattern: a
* caller-supplied `timeoutSecs` larger than [ForgeOptions.defaultTimeout]
* extends the HTTP request window so we don't disconnect while the
* server is still doing useful work.
*/
internal suspend fun sessionTurnInternal(
id: String,
prompt: String,
files: List<String>?,
timeoutSecs: Int?,
): TurnResult {
val effectiveTimeoutSecs = timeoutSecs
?: options.defaultTimeout.inWholeSeconds.toInt()
val callTimeoutMs = (effectiveTimeoutSecs * 1000L) + options.requestMargin.inWholeMilliseconds
val encodedId = id.encodeURLPathPart()
return wrapTransport("POST /sessions/{id}/turn") {
val resp = http.post("$baseUrl/sessions/$encodedId/turn") {
timeout {
requestTimeoutMillis = callTimeoutMs
socketTimeoutMillis = callTimeoutMs
}
authHeader()
contentType(ContentType.Application.Json)
setBody(
SessionTurnBody(
prompt = prompt,
files = files?.takeIf { it.isNotEmpty() },
timeoutSecs = timeoutSecs,
),
)
}
decodeOrThrow(resp)
}
}
/**
* Internal: `DELETE /sessions/{id}`. Don't call directly go through
* [Session.close], which guards with an [java.util.concurrent.atomic.AtomicBoolean]
* so the DELETE only fires once per session handle.
*/
internal suspend fun sessionCloseInternal(id: String) {
val encodedId = id.encodeURLPathPart()
wrapTransport("DELETE /sessions/{id}") {
val resp = http.delete("$baseUrl/sessions/$encodedId") { authHeader() }
if (!resp.status.isSuccess()) {
throwForStatus(resp)
}
}
}
/** Disposes the underlying Ktor client. After this, the instance is unusable. */ /** Disposes the underlying Ktor client. After this, the instance is unusable. */
override fun close() { override fun close() {
http.close() http.close()

View file

@ -0,0 +1,279 @@
package com.clawdforge
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
/**
* Options for [ForgeClient.createSession] / [ForgeClient.session].
*
* Pass an empty instance for defaults:
*
* ```kotlin
* forge.session { s -> s.turn("hi") } // defaults
* forge.session(SessionOptions(agent = "claude")) { s -> ... } // explicit
* ```
*
* @property agent agent slug the server should bind this session to.
* Defaults to `"claude"`.
* @property meta opaque per-session metadata; serialised verbatim under the
* `meta` key in the create-body. `null` is omitted from the wire.
*/
public data class SessionOptions(
public val agent: String = "claude",
public val meta: JsonElement? = null,
)
/**
* One structured event emitted by a turn.
*
* The server collects these from acpx as the agent runs and returns them
* as a complete batch when the turn ends. Each event has at minimum a
* [type] (`"text"`, `"thinking"`, `"tool_call"`, ...); the type-specific
* payload fields are pass-through and may be absent for non-applicable
* types.
*
* Forward-compat: unknown types decode normally with absent fields left
* `null`. Callers should switch on [type] and defensively handle `null`
* payload fields.
*
* @property type event type, e.g. `"text"`, `"thinking"`, `"tool_call"`.
* @property content text content for `text` / `thinking` events; null otherwise.
* @property name tool name for `tool_call` events; null otherwise.
* @property args tool-call arguments for `tool_call` events; null otherwise.
* @property result tool-call result for `tool_call` events; null otherwise.
*/
@Serializable
public data class TurnEvent(
public val type: String,
public val content: String? = null,
public val name: String? = null,
public val args: JsonElement? = null,
public val result: JsonElement? = null,
)
/**
* Parsed body of `POST /sessions/{id}/turn`.
*
* Use [text] to concatenate the `text` events into a single string for the
* common case where you just want "what did the model say?".
*
* @property ok true on a successful turn.
* @property sessionId server session id (also acpx's id).
* @property turnIndex zero-based turn ordinal within this session.
* @property events structured events emitted by the turn, in order.
* @property stopReason reason the turn ended (`"end_turn"`, `"max_tokens"`, ).
* @property durationMs server-side wall-clock duration in milliseconds.
*/
@Serializable
public data class TurnResult(
public val ok: Boolean,
@SerialName("session_id") public val sessionId: String,
@SerialName("turn_index") public val turnIndex: Int,
public val events: List<TurnEvent>,
@SerialName("stop_reason") public val stopReason: String,
@SerialName("duration_ms") public val durationMs: Long,
) {
/**
* Concatenates the [TurnEvent.content] of every `type == "text"` event
* in [events] order. Skips events whose content is `null`.
*/
public fun text(): String =
events.asSequence()
.filter { it.type == "text" }
.mapNotNull { it.content }
.joinToString("")
}
/**
* Server's view of one session returned by `GET /sessions/{id}` and as
* one row of `GET /sessions`.
*
* @property sessionId server session id.
* @property agent agent slug this session is bound to.
* @property appName name of the app token that owns the session; may be
* absent on responses where the server elects not to
* include it.
* @property createdAt unix-second creation timestamp.
* @property lastTurnAt unix-second timestamp of the most recent turn, or
* `null` if no turn has been sent yet.
* @property turnCount number of turns sent so far.
* @property closedAt unix-second close timestamp, or `null` if the
* session is still open.
*/
@Serializable
public data class SessionState(
@SerialName("session_id") public val sessionId: String,
public val agent: String,
@SerialName("app_name") public val appName: String? = null,
@SerialName("created_at") public val createdAt: Long,
@SerialName("last_turn_at") public val lastTurnAt: Long? = null,
@SerialName("turn_count") public val turnCount: Int,
@SerialName("closed_at") public val closedAt: Long? = null,
)
/** Wrapper for `GET /sessions`. */
@Serializable
internal data class SessionList(
val sessions: List<SessionState> = emptyList(),
)
/** Wrapper for `POST /sessions`. */
@Serializable
internal data class SessionCreateBody(
val agent: String? = null,
val meta: JsonElement? = null,
)
/** Wrapper for `POST /sessions` response. */
@Serializable
internal data class SessionCreateResponse(
@SerialName("session_id") val sessionId: String,
val agent: String = "claude",
@SerialName("created_at") val createdAt: Long = 0,
)
/** Wire body for `POST /sessions/{id}/turn`. */
@Serializable
internal data class SessionTurnBody(
val prompt: String,
val files: List<String>? = null,
@SerialName("timeout_secs") val timeoutSecs: Int? = null,
)
/**
* Handle to a server-side multi-turn session.
*
* Construct via [ForgeClient.createSession] (or grab one from inside the
* [ForgeClient.session] block helper, which auto-closes on exit).
*
* Implements [Closeable], so it composes with `use { }`:
*
* ```kotlin
* forge.createSession().use { s ->
* val r = s.turn("hello")
* println(r.text())
* }
* ```
*
* ### Thread safety
*
* [turn] is `suspend` and serialises concurrent calls on the same instance
* via an internal [Mutex] two coroutines calling `turn()` at once will
* be ordered (FIFO with respect to mutex acquisition), so the server only
* ever sees one in-flight turn per session at a time.
*
* [close] is idempotent: the first invocation flips an [AtomicBoolean] and
* issues `DELETE /sessions/{id}`; subsequent invocations short-circuit
* without an HTTP round-trip. If the DELETE fails on the first attempt the
* flag is rolled back so the caller can retry a transient transport blip
* shouldn't leave the client thinking a still-live session was reaped.
*
* ### Bearer-leak hardening
*
* The embedded [client] reference is intentionally omitted from
* [toString] including it would risk leaking the bearer token into logs
* (see [AppToken.toString] for the parallel pattern).
*
* @property id server-assigned session id (also acpx's id).
* @property agent agent slug this session is bound to.
* @property createdAt unix-second creation timestamp recorded by the server.
*/
public class Session internal constructor(
private val client: ForgeClient,
public val id: String,
public val agent: String,
public val createdAt: Long,
) : Closeable {
private val closedFlag = AtomicBoolean(false)
private val turnMutex = Mutex()
/**
* `true` once [close] has succeeded at least once on this client. Note
* this is the client's view: the server may have reaped the session
* out from under us (TTL sweeper, manual revoke) without our knowing.
*/
public val isClosed: Boolean get() = closedFlag.get()
/**
* Sends one turn on this session.
*
* Concurrent calls on the same [Session] instance are serialised via an
* internal mutex the second caller suspends until the first one
* returns. The server is therefore never asked to handle two turns on
* the same session at the same time.
*
* @param prompt prompt text. Must not be empty.
* @param files optional list of `ff_` file tokens to attach.
* `null` or empty omits the field.
* @param timeoutSecs optional per-turn subprocess budget in seconds.
* Tracks the audit-fix [pattern](file://3c77ef5):
* overrides the default HTTP request timeout so a
* value larger than [ForgeOptions.defaultTimeout]
* does not produce an HTTP-disconnect mid-turn.
* @throws IllegalStateException if [isClosed] is `true`.
* @throws IllegalArgumentException if [prompt] is empty.
* @throws ForgeAuthException on HTTP 401/403.
* @throws ForgeApiException on any other non-2xx response.
* @throws ForgeTransportException on transport / decoding failure.
*/
public suspend fun turn(
prompt: String,
files: List<String>? = null,
timeoutSecs: Int? = null,
): TurnResult {
check(!isClosed) { "session $id is closed" }
require(prompt.isNotEmpty()) { "prompt must not be empty" }
return turnMutex.withLock {
client.sessionTurnInternal(id, prompt, files, timeoutSecs)
}
}
/**
* Fetches the current server-side state for this session equivalent
* to `forge.getSession(id)`.
*/
public suspend fun state(): SessionState = client.getSession(id)
/**
* Closes the server-side session.
*
* Idempotent on the client: a second call short-circuits without an
* HTTP round-trip via [AtomicBoolean.compareAndSet]. The server's
* `DELETE /sessions/{id}` is itself idempotent (returns
* `already_closed: true` on a re-close), so even concurrent closes
* degrade gracefully.
*
* If the DELETE fails on the first attempt the closed flag is cleared
* so a subsequent call can retry.
*
* Implemented synchronously via [runBlocking] because [Closeable.close]
* is not a `suspend` function the body is one DELETE so the blocking
* call is bounded by the configured request timeout.
*/
override fun close() {
if (closedFlag.compareAndSet(false, true)) {
try {
runBlocking { client.sessionCloseInternal(id) }
} catch (e: Throwable) {
closedFlag.set(false) // allow retry on transient failure
throw e
}
}
}
/**
* Renders this session **without** the embedded [client] reference,
* which holds the bearer token. Same hazard pattern as
* [AppToken.toString] the standard data-class auto-`toString` would
* print every component.
*/
override fun toString(): String =
"Session(id=$id, agent=$agent, closed=${closedFlag.get()})"
}

View file

@ -0,0 +1,576 @@
package com.clawdforge
import io.ktor.client.engine.mock.MockEngine
import io.ktor.client.engine.mock.MockRequestHandleScope
import io.ktor.client.engine.mock.respond
import io.ktor.client.engine.mock.respondError
import io.ktor.client.request.HttpRequestData
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.headersOf
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.put
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds
/**
* v0.2 multi-turn / Session API tests. All interactions go through Ktor's
* [MockEngine] no real network.
*/
class SessionTest {
private val jsonContentType = headersOf(HttpHeaders.ContentType, "application/json")
/** Per-test request hit counts keyed by `"<METHOD> <path-pattern>"`. */
private class Hits {
private val map = mutableMapOf<String, AtomicInteger>()
operator fun get(key: String): Int = map[key]?.get() ?: 0
fun bump(key: String) {
map.getOrPut(key) { AtomicInteger() }.incrementAndGet()
}
}
private fun client(
defaultTimeout: kotlin.time.Duration = 60.seconds,
requestMargin: kotlin.time.Duration = 30.seconds,
token: String = "cf_test",
handler: suspend MockRequestHandleScope.(HttpRequestData) -> io.ktor.client.request.HttpResponseData,
): ForgeClient {
val engine = MockEngine { req -> handler(req) }
return ForgeClient(
baseUrl = "http://forge.test",
token = token,
options = ForgeOptions(
engine = engine,
defaultTimeout = defaultTimeout,
requestMargin = requestMargin,
),
)
}
/**
* Wires up a minimal /sessions surface with hit counters keyed by
* route pattern. Returns the [Hits] instance for assertions.
*/
private fun minimalSessionRoutes(
sessionId: String = "sess_abc",
): Pair<Hits, suspend MockRequestHandleScope.(HttpRequestData) -> io.ktor.client.request.HttpResponseData> {
val hits = Hits()
val turnIdx = AtomicInteger()
val handler: suspend MockRequestHandleScope.(HttpRequestData) -> io.ktor.client.request.HttpResponseData =
{ req ->
val path = req.url.encodedPath
when {
req.method == HttpMethod.Post && path == "/sessions" -> {
hits.bump("POST /sessions")
respond(
"""{"session_id":"$sessionId","agent":"claude","created_at":1700000000}""",
HttpStatusCode.OK,
jsonContentType,
)
}
req.method == HttpMethod.Get && path == "/sessions" -> {
hits.bump("GET /sessions")
respond("""{"sessions":[]}""", HttpStatusCode.OK, jsonContentType)
}
req.method == HttpMethod.Post && path.endsWith("/turn") -> {
hits.bump("POST /sessions/{id}/turn")
val idx = turnIdx.getAndIncrement()
respond(
"""{"ok":true,"session_id":"$sessionId","turn_index":$idx,""" +
""""events":[{"type":"text","content":"hi"}],""" +
""""stop_reason":"end_turn","duration_ms":42}""",
HttpStatusCode.OK,
jsonContentType,
)
}
req.method == HttpMethod.Delete && path.startsWith("/sessions/") -> {
hits.bump("DELETE /sessions/{id}")
respond("""{"ok":true}""", HttpStatusCode.OK, jsonContentType)
}
req.method == HttpMethod.Get && path.startsWith("/sessions/") -> {
hits.bump("GET /sessions/{id}")
respond(
"""{"session_id":"$sessionId","agent":"claude","app_name":"app1",""" +
""""created_at":1700000000,"last_turn_at":null,""" +
""""turn_count":0,"closed_at":null}""",
HttpStatusCode.OK,
jsonContentType,
)
}
else -> respondError(HttpStatusCode.MethodNotAllowed)
}
}
return hits to handler
}
// ------------------------------------------------------------------
@Test
fun sessionBlockHelperAutoCloses() = runTest {
val (hits, handler) = minimalSessionRoutes("sess_auto")
client(handler = handler).use { c ->
c.session { s ->
assertEquals("sess_auto", s.id)
assertEquals("claude", s.agent)
assertEquals(1700000000L, s.createdAt)
assertFalse(s.isClosed)
val r = s.turn("hi")
assertTrue(r.ok)
}
}
assertEquals(1, hits["POST /sessions"])
assertEquals(1, hits["POST /sessions/{id}/turn"])
assertEquals(1, hits["DELETE /sessions/{id}"])
}
@Test
fun sessionBlockHelperClosesOnException() = runTest {
val (hits, handler) = minimalSessionRoutes("sess_throw")
val ex = assertFailsWith<IllegalStateException> {
client(handler = handler).use { c ->
c.session<Nothing> { _ ->
error("boom")
}
}
}
assertEquals("boom", ex.message)
assertEquals(1, hits["POST /sessions"])
assertEquals(
1,
hits["DELETE /sessions/{id}"],
"DELETE must fire on exception unwind",
)
}
@Test
fun sessionCloseIdempotent() = runTest {
val (hits, handler) = minimalSessionRoutes("sess_idem")
client(handler = handler).use { c ->
val s = c.createSession()
s.close()
assertTrue(s.isClosed)
s.close() // short-circuit
s.close() // and again
assertEquals(
1,
hits["DELETE /sessions/{id}"],
"DELETE must hit ONCE across multiple close() calls",
)
}
}
@Test
fun sessionTurnRoundTrip() = runTest {
var sentBody: String? = null
var sentPath: String? = null
val c = client { req ->
val path = req.url.encodedPath
when {
req.method == HttpMethod.Post && path == "/sessions" -> respond(
"""{"session_id":"sess_rt","agent":"claude","created_at":1}""",
HttpStatusCode.OK,
jsonContentType,
)
req.method == HttpMethod.Post && path.endsWith("/turn") -> {
sentPath = path
sentBody = (req.body as io.ktor.http.content.TextContent).text
respond(
"""{"ok":true,"session_id":"sess_rt","turn_index":2,""" +
""""events":[""" +
"""{"type":"thinking","content":"..."},""" +
"""{"type":"tool_call","name":"Read",""" +
""""args":{"path":"x"},"result":"ok"},""" +
"""{"type":"text","content":"hello"},""" +
"""{"type":"text","content":" world"}""" +
"""],"stop_reason":"end_turn","duration_ms":123}""",
HttpStatusCode.OK,
jsonContentType,
)
}
req.method == HttpMethod.Delete -> respond(
"""{"ok":true}""",
HttpStatusCode.OK,
jsonContentType,
)
else -> respondError(HttpStatusCode.MethodNotAllowed)
}
}
c.use { client ->
client.session { s ->
val r = s.turn(prompt = "Read README", files = listOf("ff_abc", "ff_def"))
assertTrue(r.ok)
assertEquals("sess_rt", r.sessionId)
assertEquals(2, r.turnIndex)
assertEquals("end_turn", r.stopReason)
assertEquals(123L, r.durationMs)
assertEquals(4, r.events.size)
assertEquals("hello world", r.text())
}
}
assertNotNull(sentBody)
assertContains(sentBody!!, "\"prompt\":\"Read README\"")
assertContains(sentBody!!, "\"files\":[\"ff_abc\",\"ff_def\"]")
assertEquals("/sessions/sess_rt/turn", sentPath)
}
@Test
fun sessionTurnAfterCloseThrows() = runTest {
val (_, handler) = minimalSessionRoutes("sess_dead")
client(handler = handler).use { c ->
val s = c.createSession()
s.close()
val ex = assertFailsWith<IllegalStateException> {
s.turn("anybody home?")
}
assertContains(ex.message ?: "", "closed")
}
}
@Test
fun sessionTurnHttpTimeoutHonorsPerCallTimeoutSecs() = runTest {
// Audit-fix 3c77ef5 regression mirror: per-call timeout overrides
// the default. Global floor: defaultTimeout=5s + requestMargin=1s = 6s.
// Server delays 8s before responding; caller passes timeoutSecs=20.
// Without the per-call override: HttpRequestTimeoutException at 6s.
// With the override: succeeds.
val c = client(defaultTimeout = 5.seconds, requestMargin = 1.seconds) { req ->
val path = req.url.encodedPath
when {
req.method == HttpMethod.Post && path == "/sessions" -> respond(
"""{"session_id":"sess_to","agent":"claude","created_at":1}""",
HttpStatusCode.OK,
jsonContentType,
)
req.method == HttpMethod.Post && path.endsWith("/turn") -> {
delay(8.seconds)
respond(
"""{"ok":true,"session_id":"sess_to","turn_index":0,""" +
""""events":[{"type":"text","content":"done"}],""" +
""""stop_reason":"end_turn","duration_ms":8000}""",
HttpStatusCode.OK,
jsonContentType,
)
}
req.method == HttpMethod.Delete -> respond(
"""{"ok":true}""",
HttpStatusCode.OK,
jsonContentType,
)
else -> respondError(HttpStatusCode.MethodNotAllowed)
}
}
c.use { client ->
client.session { s ->
val r = s.turn("long-running", timeoutSecs = 20)
assertEquals("done", r.text())
}
}
}
@Test
fun turnResultTextConcatenates() {
val r = TurnResult(
ok = true,
sessionId = "s",
turnIndex = 0,
events = listOf(
TurnEvent(type = "thinking", content = "ignored"),
TurnEvent(type = "text", content = "alpha "),
TurnEvent(
type = "tool_call",
name = "Read",
args = buildJsonObject { put("p", "x") },
result = JsonPrimitive("result"),
),
TurnEvent(type = "text", content = "beta"),
TurnEvent(type = "text", content = null), // skipped
),
stopReason = "end_turn",
durationMs = 10L,
)
assertEquals("alpha beta", r.text())
}
@Test
fun listSessions() = runTest {
val c = client { req ->
assertEquals(HttpMethod.Get, req.method)
assertEquals("/sessions", req.url.encodedPath)
respond(
"""{"sessions":[""" +
"""{"session_id":"a","agent":"claude","app_name":"app1",""" +
""""created_at":100,"last_turn_at":150,"turn_count":3,""" +
""""closed_at":null},""" +
"""{"session_id":"b","agent":"claude","app_name":"app1",""" +
""""created_at":200,"last_turn_at":null,"turn_count":0,""" +
""""closed_at":250}""" +
"""]}""",
HttpStatusCode.OK,
jsonContentType,
)
}
c.use {
val ss = it.listSessions()
assertEquals(2, ss.size)
assertEquals("a", ss[0].sessionId)
assertEquals("app1", ss[0].appName)
assertEquals(150L, ss[0].lastTurnAt)
assertNull(ss[0].closedAt)
assertEquals(3, ss[0].turnCount)
assertEquals("b", ss[1].sessionId)
assertNull(ss[1].lastTurnAt)
assertEquals(250L, ss[1].closedAt)
}
}
@Test
fun getSession() = runTest {
val c = client { req ->
assertEquals(HttpMethod.Get, req.method)
assertEquals("/sessions/sess_g", req.url.encodedPath)
respond(
"""{"session_id":"sess_g","agent":"claude","app_name":"app1",""" +
""""created_at":100,"last_turn_at":120,"turn_count":1,""" +
""""closed_at":null}""",
HttpStatusCode.OK,
jsonContentType,
)
}
c.use {
val st = it.getSession("sess_g")
assertEquals("sess_g", st.sessionId)
assertEquals("claude", st.agent)
assertEquals("app1", st.appName)
assertEquals(100L, st.createdAt)
assertEquals(120L, st.lastTurnAt)
assertEquals(1, st.turnCount)
assertNull(st.closedAt)
}
}
@Test
fun crossTokenIs404() = runTest {
val c = client(token = "cf_other") { req ->
assertEquals(HttpMethod.Get, req.method)
respondError(HttpStatusCode.NotFound, """{"detail":"no such session"}""")
}
c.use {
val ex = assertFailsWith<ForgeApiException> {
it.getSession("sess_owned_by_a")
}
assertEquals(404, ex.statusCode)
assertContains(ex.body, "no such session")
}
}
@Test
fun sessionToStringDoesNotLeakToken() = runTest {
val (_, handler) = minimalSessionRoutes("sess_redact")
val c = ForgeClient(
baseUrl = "http://forge.test",
token = "cf_super_secret_do_not_log",
options = ForgeOptions(engine = MockEngine { req -> handler(req) }),
)
c.use { client ->
client.session { s ->
val repr = s.toString()
assertFalse(
repr.contains("cf_super_secret_do_not_log"),
"Session.toString() must not include the bearer; got: $repr",
)
assertFalse(
repr.contains("ForgeClient"),
"Session.toString() should not embed the client; got: $repr",
)
assertFalse(
repr.lowercase().contains("token"),
"Session.toString() should not even hint at a token field; got: $repr",
)
assertContains(repr, "id=sess_redact")
assertContains(repr, "closed=false")
}
}
}
@Test
fun concurrentTurnsAreSerialized() = runTest {
// Two coroutines call s.turn() concurrently. The mutex on Session
// must serialize them so the server only ever sees one in-flight
// turn at a time. We assert: max-observed in-flight == 1, and both
// turns complete.
val inFlight = AtomicInteger()
val maxInFlight = AtomicInteger()
val firstTurnReleased = CompletableDeferred<Unit>()
val firstTurnEntered = CompletableDeferred<Unit>()
val turnCounter = AtomicInteger()
val c = client { req ->
val path = req.url.encodedPath
when {
req.method == HttpMethod.Post && path == "/sessions" -> respond(
"""{"session_id":"sess_par","agent":"claude","created_at":1}""",
HttpStatusCode.OK,
jsonContentType,
)
req.method == HttpMethod.Post && path.endsWith("/turn") -> {
val now = inFlight.incrementAndGet()
maxInFlight.updateAndGet { kotlin.math.max(it, now) }
val idx = turnCounter.getAndIncrement()
if (idx == 0) {
// First turn: signal entry, then block until released.
firstTurnEntered.complete(Unit)
firstTurnReleased.await()
}
inFlight.decrementAndGet()
respond(
"""{"ok":true,"session_id":"sess_par","turn_index":$idx,""" +
""""events":[{"type":"text","content":"r$idx"}],""" +
""""stop_reason":"end_turn","duration_ms":1}""",
HttpStatusCode.OK,
jsonContentType,
)
}
req.method == HttpMethod.Delete -> respond(
"""{"ok":true}""",
HttpStatusCode.OK,
jsonContentType,
)
else -> respondError(HttpStatusCode.MethodNotAllowed)
}
}
c.use { client ->
client.session { s ->
coroutineScope {
val a = async { s.turn("first") }
// Wait until the first turn has actually entered the
// mock handler before kicking off the second; otherwise
// we could race past the mutex acquisition itself.
firstTurnEntered.await()
val b = async { s.turn("second") }
// Now release the first; the second is queued behind it.
firstTurnReleased.complete(Unit)
val results = awaitAll(a, b)
assertEquals("r0", results[0].text())
assertEquals("r1", results[1].text())
}
}
}
assertEquals(2, turnCounter.get())
assertEquals(
1,
maxInFlight.get(),
"concurrent turns on a Session must be serialized via the per-session mutex",
)
}
@Test
fun createSessionSendsAgentAndMeta() = runTest {
var sentBody: String? = null
val c = client { req ->
assertEquals(HttpMethod.Post, req.method)
assertEquals("/sessions", req.url.encodedPath)
sentBody = (req.body as io.ktor.http.content.TextContent).text
respond(
"""{"session_id":"sess_meta","agent":"claude","created_at":1}""",
HttpStatusCode.OK,
jsonContentType,
)
}
c.use {
val opts = SessionOptions(
agent = "claude",
meta = buildJsonObject { put("trace", "abc-123") },
)
val s = it.createSession(opts)
assertEquals("sess_meta", s.id)
}
assertNotNull(sentBody)
assertContains(sentBody!!, "\"agent\":\"claude\"")
assertContains(sentBody!!, "\"trace\":\"abc-123\"")
}
@Test
fun closeOnTransientFailureClearsFlag() = runTest {
val deletes = AtomicInteger()
val c = client { req ->
val path = req.url.encodedPath
when {
req.method == HttpMethod.Post && path == "/sessions" -> respond(
"""{"session_id":"sess_retry","agent":"claude","created_at":1}""",
HttpStatusCode.OK,
jsonContentType,
)
req.method == HttpMethod.Delete -> {
val n = deletes.incrementAndGet()
if (n == 1) {
respondError(HttpStatusCode.InternalServerError, """{"detail":"transient"}""")
} else {
respond("""{"ok":true}""", HttpStatusCode.OK, jsonContentType)
}
}
else -> respondError(HttpStatusCode.MethodNotAllowed)
}
}
c.use { client ->
val s = client.createSession()
assertFailsWith<ForgeApiException> { s.close() }
assertFalse(s.isClosed, "transient failure must clear the closed flag for retry")
// Retry succeeds.
s.close()
assertTrue(s.isClosed)
assertEquals(2, deletes.get())
}
}
@Test
fun v01RunUnchanged() = runTest {
// Regression: /run on a v0.2-equipped client must behave exactly
// as v0.1, including snake_case body shape and JsonElement result.
var captured: String? = null
val c = client { req ->
assertEquals(HttpMethod.Post, req.method)
assertEquals("/run", req.url.encodedPath)
captured = (req.body as io.ktor.http.content.TextContent).text
respond(
"""{"ok":true,"result":{"hello":"world"},""" +
""""duration_ms":1234,"stop_reason":"end_turn"}""",
HttpStatusCode.OK,
jsonContentType,
)
}
c.use {
val r = it.run(RunRequest(prompt = "hi", model = "sonnet", timeoutSecs = 60))
assertTrue(r.ok)
assertEquals(1234L, r.durationMs)
assertEquals("end_turn", r.stopReason)
val obj: JsonObject = r.result.jsonObject
assertEquals("world", (obj["hello"] as JsonPrimitive).content)
}
assertNotNull(captured)
assertContains(captured!!, "\"timeout_secs\":60")
assertContains(captured!!, "\"model\":\"sonnet\"")
}
}