From e0f161f18c0d7ece582203961bd86e9a8f784cf4 Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 29 Apr 2026 06:39:16 -0700 Subject: [PATCH] clients/rust: v0.2 multi-turn Session API - Session with consume-self close() (compile-time use-after-close) - AtomicBool flag + Drop best-effort async close via tokio::spawn (logged on failure) - Client::new_session / list_sessions / get_session - TurnResult.text() helper, hand-written Debug to avoid bearer leak - tests/sessions.rs: 12 tests covering new/close/idempotent/drop/list/state/404/text/debug-redaction - README "Multi-turn / Sessions (v0.2)" section v0.1 run path unchanged. Spec: memory/spec-clawdforge-v0.2.md Server core: 940861f --- clients/rust/Cargo.toml | 3 +- clients/rust/README.md | 91 +++++++ clients/rust/src/client.rs | 116 +++++++++ clients/rust/src/lib.rs | 2 + clients/rust/src/session.rs | 281 ++++++++++++++++++++++ clients/rust/tests/sessions.rs | 425 +++++++++++++++++++++++++++++++++ 6 files changed, 917 insertions(+), 1 deletion(-) create mode 100644 clients/rust/src/session.rs create mode 100644 clients/rust/tests/sessions.rs diff --git a/clients/rust/Cargo.toml b/clients/rust/Cargo.toml index e693461..1b02e50 100644 --- a/clients/rust/Cargo.toml +++ b/clients/rust/Cargo.toml @@ -14,8 +14,9 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "mul serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1" -tokio = { version = "1", features = ["fs", "io-util"] } +tokio = { version = "1", features = ["fs", "io-util", "rt"] } tokio-util = { version = "0.7", features = ["io"] } +tracing = { version = "0.1", default-features = false, features = ["std"] } url = "2" bytes = "1" diff --git a/clients/rust/README.md b/clients/rust/README.md index 296ba07..830b24b 100644 --- a/clients/rust/README.md +++ b/clients/rust/README.md @@ -76,6 +76,94 @@ async fn main() -> Result<(), Box> { } ``` +## Multi-turn / Sessions (v0.2) + +v0.1 `Client::run` is a single-turn shot. v0.2 adds a parallel session API +backed by the server's [ACPX]-driven `/sessions/*` surface for back-and-forth +agent flows that need context across turns. + +[ACPX]: https://github.com/openclaw/acpx + +```rust +use clawdforge::{Client, SessionOptions}; + +#[tokio::main] +async fn main() -> Result<(), clawdforge::Error> { + let client = Client::builder() + .base_url("http://localhost:8800") + .token("cf_xxxxxxxxxxxxxxxx") + .build()?; + + let mut s = client.new_session(SessionOptions::default()).await?; + + let r1 = s.turn("Read README.md and summarize it").await?; + println!("{}", r1.text()); + + // Attach files uploaded via Client::upload_file. + let r2 = s + .turn_with_files( + "Now look at the auth flow", + &["ff_xyz".into()], + ) + .await?; + println!("turn {}: {}", r2.turn_index, r2.text()); + + // Explicit close consumes `s` — using it after this is a compile error. + s.close().await?; + Ok(()) +} +``` + +### Lifecycle + +| API | Purpose | +|---|---| +| `Client::new_session(SessionOptions)` | `POST /sessions` — returns a `Session`. | +| `Session::turn(prompt)` | `POST /sessions/{id}/turn` with no files. | +| `Session::turn_with_files(prompt, &[token, ...])` | `POST /sessions/{id}/turn` with `ff_*` tokens from `upload_file`. | +| `Session::close(self)` | `DELETE /sessions/{id}`. **Consumes `self`** — use-after-close is a compile error. | +| `Client::list_sessions()` | `GET /sessions` — sessions visible to the calling token. | +| `Client::get_session(id)` | `GET /sessions/{id}` — current state. | + +### Drop fallback + +If a `Session` is dropped without an explicit `close().await?`, `Drop` +spawns a best-effort async DELETE via `tokio::spawn` to release the +server-side session. This is **best-effort**: + +- The spawned future is not awaited — the calling task continues immediately. +- Failures are logged via `tracing::warn!` (target `clawdforge::session`), + not panicked. +- If `Session` is dropped outside any tokio runtime, the close is skipped + with a warning rather than panicking on `tokio::spawn`. +- If `close().await?` already ran, `Drop` short-circuits without a second + network call (an `AtomicBool` flag tracks closed state). + +For deterministic cleanup, **prefer `s.close().await?`**. The Drop path is a +backstop for panics / early returns, not a primary lifecycle hook. + +### `TurnResult::text()` + +Concatenates all `"text"` events into one string. `"thinking"` and +`"tool_call"` events are skipped — inspect `result.events` directly if you +need them. + +```rust +let r = s.turn("hi").await?; +let answer: String = r.text(); +let n_tool_calls = r + .events + .iter() + .filter(|e| e.event_type == "tool_call") + .count(); +``` + +### v0.1 compatibility + +The v0.1 surface (`Client::run`, `Client::upload_file`, +`Client::create_token`, etc.) is byte-identical. v0.2 is purely additive. v0.1 +callers do not need to change anything to upgrade. + ## Public API ### `Client::builder()` @@ -102,6 +190,9 @@ Builder for the HTTP client. | `create_token(TokenCreateRequest)` | `POST /admin/tokens` | Admin only. Returns `AppToken`. | | `list_tokens()` | `GET /admin/tokens` | Admin only. Returns `TokenList`. | | `revoke_token(name)` | `DELETE /admin/tokens/{name}` | Admin only. | +| `new_session(opts)` | `POST /sessions` | v0.2. Returns `Session`. | +| `list_sessions()` | `GET /sessions` | v0.2. Returns `SessionList`. | +| `get_session(id)` | `GET /sessions/{id}` | v0.2. Returns `SessionState`. | ### `RunResult` helpers diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 0923e0f..950e1f4 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -11,6 +11,10 @@ use tokio_util::io::ReaderStream; use url::Url; use crate::error::Error; +use crate::session::{ + Session, SessionCloseResponse, SessionCreateResponse, SessionList, SessionOptions, + SessionState, TurnResult, +}; use crate::types::{ AppToken, FileToken, Healthz, RunRequest, RunResult, TokenCreateRequest, TokenList, }; @@ -250,8 +254,105 @@ impl Client { Ok(()) } + // ---- v0.2 multi-turn / sessions --------------------------------------- + + /// `POST /sessions`. Create a new multi-turn session and return a + /// [`Session`] handle bound to this client. + /// + /// The handle owns a clone of the client; dropping it without an explicit + /// `Session::close().await?` triggers a best-effort async DELETE via + /// `tokio::spawn`. See [`Session`] for the full lifecycle contract. + pub async fn new_session(&self, opts: SessionOptions) -> Result { + let token = self.require_app()?; + let url = self.url("/sessions")?; + let resp = self + .inner + .post(url) + .bearer_auth(token) + .json(&opts) + .send() + .await?; + let created: SessionCreateResponse = json_or_error(resp).await?; + Ok(Session { + client: self.clone(), + session_id: created.session_id, + agent: created.agent, + created_at: created.created_at, + closed: std::sync::atomic::AtomicBool::new(false), + }) + } + + /// `GET /sessions`. List all sessions visible to the calling app token. + pub async fn list_sessions(&self) -> Result { + let token = self.require_app()?; + let url = self.url("/sessions")?; + let resp = self.inner.get(url).bearer_auth(token).send().await?; + json_or_error(resp).await + } + + /// `GET /sessions/{id}`. Fetch the current state of a session. + /// + /// `id` is validated client-side against the same path-traversal guard as + /// [`Self::revoke_token`] — anything containing `/`, `?`, `#`, `..`, or + /// empty short-circuits with [`Error::Config`]. + pub async fn get_session(&self, id: &str) -> Result { + validate_session_id(id)?; + let token = self.require_app()?; + let url = self.url(&format!("/sessions/{id}"))?; + let resp = self.inner.get(url).bearer_auth(token).send().await?; + json_or_error(resp).await + } + + /// Internal helper used by both [`Session::close`] and [`Session`]'s + /// `Drop` impl to issue `DELETE /sessions/{id}`. + pub(crate) async fn close_session_internal(&self, id: &str) -> Result<(), Error> { + validate_session_id(id)?; + let token = self.require_app()?; + let url = self.url(&format!("/sessions/{id}"))?; + let resp = self.inner.delete(url).bearer_auth(token).send().await?; + if resp.status().is_success() { + // Body is informational — `{ ok, already_closed? }`. Drain and + // ignore decode failure (server may legitimately 204). + let bytes = resp.bytes().await?; + if !bytes.is_empty() { + let _ = serde_json::from_slice::(&bytes); + } + return Ok(()); + } + // Funnel non-2xx through json_or_error for Auth/Api mapping. + let _: serde_json::Value = json_or_error(resp).await?; + Ok(()) + } + + /// Internal helper used by [`Session::turn`] / + /// [`Session::turn_with_files`] to dispatch to + /// `POST /sessions/{id}/turn`. + pub(crate) async fn turn_internal( + &self, + id: &str, + body: &B, + ) -> Result { + validate_session_id(id)?; + let token = self.require_app()?; + let url = self.url(&format!("/sessions/{id}/turn"))?; + let resp = self + .inner + .post(url) + .bearer_auth(token) + .json(body) + .send() + .await?; + json_or_error(resp).await + } + // ---- internal ---------------------------------------------------------- + fn require_app(&self) -> Result<&str, Error> { + self.app_token + .as_deref() + .ok_or_else(|| Error::Auth("no app token configured".into())) + } + fn require_admin(&self) -> Result<&str, Error> { self.admin_token .as_deref() @@ -385,6 +486,21 @@ async fn json_or_error(resp: Response) -> Result } } +/// Defense-in-depth path validator for session ids in `/sessions/{id}*`. Same +/// shape as [`Client::revoke_token`]'s name guard — RFC 3986 dot-segment +/// resolution inside `Url::join` is the threat model here. +fn validate_session_id(id: &str) -> Result<(), Error> { + if id.is_empty() + || id.contains('/') + || id.contains('?') + || id.contains('#') + || id.contains("..") + { + return Err(Error::Config(format!("invalid session id: {id:?}"))); + } + Ok(()) +} + /// Truncate `s` to at most `max` bytes, snapping down to the nearest UTF-8 /// codepoint boundary so we never panic on multibyte sequences. Appends `…` /// to the truncated form. `str::floor_char_boundary` (stable 1.80+) does the diff --git a/clients/rust/src/lib.rs b/clients/rust/src/lib.rs index 09eda47..59d8313 100644 --- a/clients/rust/src/lib.rs +++ b/clients/rust/src/lib.rs @@ -46,10 +46,12 @@ mod client; mod error; +pub mod session; pub mod types; pub use client::{Client, ClientBuilder}; pub use error::Error; +pub use session::{Session, SessionList, SessionOptions, SessionState, TurnEvent, TurnResult}; pub use types::{ AppToken, AppTokenInfo, FileToken, Healthz, RunFailure, RunRequest, RunResult, TokenCreateRequest, TokenList, diff --git a/clients/rust/src/session.rs b/clients/rust/src/session.rs new file mode 100644 index 0000000..915cf5b --- /dev/null +++ b/clients/rust/src/session.rs @@ -0,0 +1,281 @@ +//! Multi-turn session API (v0.2). +//! +//! v0.2 adds a parallel `/sessions/*` surface to clawdforge backed by ACPX. A +//! [`Session`] is a handle to one server-side session; [`Session::turn`] +//! dispatches a single prompt+files turn and returns the structured event +//! batch. Sessions are explicitly closed via [`Session::close`] (consumes the +//! handle, preventing use-after-close at compile time) or — as a last-resort +//! fallback — best-effort closed by [`Drop`] via `tokio::spawn`. +//! +//! v0.1 single-turn `Client::run` is unchanged; the v0.2 surface is purely +//! additive. + +use std::sync::atomic::{AtomicBool, Ordering}; + +use serde::{Deserialize, Serialize}; + +use crate::client::Client; +use crate::error::Error; + +/// Options passed to [`Client::new_session`]. +/// +/// `Default` produces `agent = None` (server picks `"claude"`) and `meta = +/// None`. +#[derive(Debug, Default, Clone, Serialize)] +pub struct SessionOptions { + /// Agent slug to dispatch to. `None` falls back to the server-side default + /// (`"claude"`). + #[serde(skip_serializing_if = "Option::is_none")] + pub agent: Option, + + /// Free-form metadata stored alongside the session ledger row. + #[serde(skip_serializing_if = "Option::is_none")] + pub meta: Option, +} + +/// Reply body from `POST /sessions`. +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct SessionCreateResponse { + pub session_id: String, + pub agent: String, + pub created_at: i64, +} + +/// One event in a turn's structured output. +/// +/// `event_type` is one of `"thinking"`, `"text"`, `"tool_call"`, etc. (server +/// is the authority on the set). +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TurnEvent { + /// Event discriminator (`"text"`, `"thinking"`, `"tool_call"`, ...). + #[serde(rename = "type")] + pub event_type: String, + /// Text content for `"text"` and `"thinking"` events. + #[serde(default)] + pub content: Option, + /// Tool name for `"tool_call"` events. + #[serde(default)] + pub name: Option, + /// Tool arguments for `"tool_call"` events. + #[serde(default)] + pub args: Option, + /// Tool result for `"tool_call"` events. + #[serde(default)] + pub result: Option, +} + +/// Successful response body from `POST /sessions/{id}/turn`. +#[derive(Debug, Clone, Deserialize)] +pub struct TurnResult { + /// Always `true` on a 200 reply. + pub ok: bool, + /// The session this turn belongs to. + pub session_id: String, + /// 1-based index of this turn within the session. + pub turn_index: i32, + /// Structured events emitted during the turn. + pub events: Vec, + /// Reason the agent stopped (`"end_turn"`, `"max_tokens"`, ...). + pub stop_reason: String, + /// Wall-clock duration of the turn. + pub duration_ms: i64, +} + +impl TurnResult { + /// Concatenate all `"text"` event contents into a single string. + /// + /// Non-text events (`thinking`, `tool_call`, ...) are skipped. If an event + /// is `"text"` but `content` is `None`, it contributes the empty string. + pub fn text(&self) -> String { + let mut out = String::new(); + for ev in &self.events { + if ev.event_type == "text" { + if let Some(c) = ev.content.as_deref() { + out.push_str(c); + } + } + } + out + } +} + +/// Reply body from `GET /sessions/{id}` and entries in `GET /sessions`. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SessionState { + /// Server-issued session id. + pub session_id: String, + /// Agent slug bound to the session. + pub agent: String, + /// App / consumer name that owns the session. + pub app_name: String, + /// Unix epoch seconds when created. + pub created_at: i64, + /// Unix epoch seconds of the last successful turn (or `None` if zero turns + /// have been dispatched yet). + #[serde(default)] + pub last_turn_at: Option, + /// Number of turns dispatched. + pub turn_count: i32, + /// Unix epoch seconds when closed (or `None` if still open). + #[serde(default)] + pub closed_at: Option, +} + +/// Reply body from `DELETE /sessions/{id}`. +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct SessionCloseResponse { + #[allow(dead_code)] + pub ok: bool, + #[serde(default)] + #[allow(dead_code)] + pub already_closed: Option, +} + +/// Reply body from `GET /sessions`. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SessionList { + /// All sessions visible to the calling token. + pub sessions: Vec, +} + +/// Request body for `POST /sessions/{id}/turn`. +#[derive(Debug, Serialize)] +struct TurnRequest<'a> { + prompt: String, + #[serde(skip_serializing_if = "Option::is_none")] + files: Option<&'a [String]>, +} + +/// A handle to one server-side multi-turn session. +/// +/// Construct via [`Client::new_session`]. Drop or [`Session::close`] to +/// release the server-side session. `close` consumes the value so use-after- +/// close is a compile error; `Drop` is a best-effort backstop that fires an +/// async DELETE via `tokio::spawn` and logs (does not panic) on failure. +/// +/// `Debug` is hand-written and explicitly excludes the embedded [`Client`] so +/// no bearer can leak through `{:?}` formatting. +pub struct Session { + pub(crate) client: Client, + pub(crate) session_id: String, + pub(crate) agent: String, + pub(crate) created_at: i64, + pub(crate) closed: AtomicBool, +} + +impl std::fmt::Debug for Session { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Deliberately omit `client` — its Debug already redacts tokens, but + // the spec-mandated shape is `Session { session_id, agent, closed }` + // to keep the surface minimal and audit-friendly. + f.debug_struct("Session") + .field("session_id", &self.session_id) + .field("agent", &self.agent) + .field("created_at", &self.created_at) + .field("closed", &self.closed.load(Ordering::Acquire)) + .finish() + } +} + +impl Session { + /// Server-issued session id. + pub fn id(&self) -> &str { + &self.session_id + } + + /// Agent slug the server bound to this session. + pub fn agent(&self) -> &str { + &self.agent + } + + /// Unix epoch seconds when the session was created server-side. + pub fn created_at(&self) -> i64 { + self.created_at + } + + /// Whether the session has been explicitly closed already. Sessions closed + /// only via `Drop`'s spawn are still reported as closed once that future + /// has run; this getter reflects the in-memory flag. + pub fn is_closed(&self) -> bool { + self.closed.load(Ordering::Acquire) + } + + /// Send a turn with no attached files. + /// + /// Equivalent to `turn_with_files(prompt, &[])` but skips serializing the + /// `files` field on the wire. + pub async fn turn(&mut self, prompt: impl Into) -> Result { + self.dispatch_turn(prompt.into(), None).await + } + + /// Send a turn that references previously uploaded file tokens. + /// + /// `files` is the list of `ff_*` tokens returned by [`Client::upload_file`]. + /// + /// [`Client::upload_file`]: crate::Client::upload_file + pub async fn turn_with_files( + &mut self, + prompt: impl Into, + files: &[String], + ) -> Result { + self.dispatch_turn(prompt.into(), Some(files)).await + } + + async fn dispatch_turn( + &mut self, + prompt: String, + files: Option<&[String]>, + ) -> Result { + if self.closed.load(Ordering::Acquire) { + return Err(Error::Config("session is closed".into())); + } + self.client + .turn_internal(&self.session_id, &TurnRequest { prompt, files }) + .await + } + + /// Explicitly close the session. Consumes `self` — use-after-close is a + /// compile error. + /// + /// If the session is already closed in memory (e.g. via a prior failed + /// close or a prior dispatch path that flagged it), this short-circuits + /// without contacting the server. + pub async fn close(self) -> Result<(), Error> { + // Mark closed before the network call so a panic-mid-await on the + // request future cannot trigger Drop's spawn into a double-close. + if self.closed.swap(true, Ordering::AcqRel) { + return Ok(()); + } + self.client.close_session_internal(&self.session_id).await + } +} + +impl Drop for Session { + fn drop(&mut self) { + // If close() already ran, nothing to do. + if self.closed.swap(true, Ordering::AcqRel) { + return; + } + // tokio::spawn panics if no runtime is current. Guard against being + // dropped from a sync context (e.g. a forgotten value at the end of a + // sync `main`). + if tokio::runtime::Handle::try_current().is_err() { + tracing::warn!( + session_id = %self.session_id, + "Session dropped outside a tokio runtime; server-side session not closed" + ); + return; + } + let client = self.client.clone(); + let id = self.session_id.clone(); + tokio::spawn(async move { + if let Err(e) = client.close_session_internal(&id).await { + tracing::warn!( + session_id = %id, + error = %e, + "best-effort drop close failed" + ); + } + }); + } +} diff --git a/clients/rust/tests/sessions.rs b/clients/rust/tests/sessions.rs new file mode 100644 index 0000000..915ca9d --- /dev/null +++ b/clients/rust/tests/sessions.rs @@ -0,0 +1,425 @@ +//! Integration tests for the v0.2 multi-turn Session API. +//! +//! All tests run against an in-process `wiremock` server — no live clawdforge +//! required. + +use std::time::Duration; + +use clawdforge::{Client, Error, SessionOptions, TurnEvent, TurnResult}; +use serde_json::json; +use wiremock::matchers::{body_json, header, method, path, path_regex}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn make_client(server: &MockServer) -> Client { + Client::builder() + .base_url(server.uri()) + .token("cf_test_token") + .timeout(Duration::from_secs(5)) + .build() + .expect("client builds") +} + +fn mock_create(session_id: &str) -> Mock { + Mock::given(method("POST")) + .and(path("/sessions")) + .and(header("authorization", "Bearer cf_test_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "session_id": session_id, + "agent": "claude", + "created_at": 1_700_000_000_i64, + }))) +} + +fn mock_delete_ok(session_id: &str) -> Mock { + Mock::given(method("DELETE")) + .and(path(format!("/sessions/{session_id}"))) + .and(header("authorization", "Bearer cf_test_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true}))) +} + +#[tokio::test] +async fn test_new_session_and_close() { + let server = MockServer::start().await; + mock_create("sess_abc").expect(1).mount(&server).await; + mock_delete_ok("sess_abc").expect(1).mount(&server).await; + + let c = make_client(&server); + let s = c + .new_session(SessionOptions::default()) + .await + .expect("new_session"); + assert_eq!(s.id(), "sess_abc"); + assert_eq!(s.agent(), "claude"); + assert_eq!(s.created_at(), 1_700_000_000); + assert!(!s.is_closed()); + + s.close().await.expect("close"); + // wiremock verifies expectations on Drop of the server. +} + +#[tokio::test] +async fn test_turn_round_trip() { + let server = MockServer::start().await; + mock_create("sess_t1").mount(&server).await; + Mock::given(method("POST")) + .and(path("/sessions/sess_t1/turn")) + .and(header("authorization", "Bearer cf_test_token")) + .and(body_json(json!({"prompt": "hello"}))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "ok": true, + "session_id": "sess_t1", + "turn_index": 1, + "events": [ + {"type": "thinking", "content": "..."}, + {"type": "text", "content": "hi back"} + ], + "stop_reason": "end_turn", + "duration_ms": 250 + }))) + .expect(1) + .mount(&server) + .await; + // Allow drop-close to land without failing other assertions. + Mock::given(method("DELETE")) + .and(path("/sessions/sess_t1")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true}))) + .mount(&server) + .await; + + let c = make_client(&server); + let mut s = c.new_session(SessionOptions::default()).await.unwrap(); + let r: TurnResult = s.turn("hello").await.unwrap(); + assert!(r.ok); + assert_eq!(r.session_id, "sess_t1"); + assert_eq!(r.turn_index, 1); + assert_eq!(r.events.len(), 2); + assert_eq!(r.stop_reason, "end_turn"); + assert_eq!(r.duration_ms, 250); + assert_eq!(r.text(), "hi back"); + + // Drive the turn_with_files path too. + Mock::given(method("POST")) + .and(path("/sessions/sess_t1/turn")) + .and(body_json( + json!({"prompt": "next", "files": ["ff_one", "ff_two"]}), + )) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "ok": true, + "session_id": "sess_t1", + "turn_index": 2, + "events": [{"type": "text", "content": "ok"}], + "stop_reason": "end_turn", + "duration_ms": 10 + }))) + .expect(1) + .mount(&server) + .await; + let r2 = s + .turn_with_files("next", &["ff_one".into(), "ff_two".into()]) + .await + .unwrap(); + assert_eq!(r2.turn_index, 2); + assert_eq!(r2.text(), "ok"); + + s.close().await.unwrap(); +} + +#[tokio::test] +async fn test_close_idempotent_short_circuits() { + let server = MockServer::start().await; + mock_create("sess_idem").mount(&server).await; + // Expect EXACTLY ONE delete — second close() is in-memory. + mock_delete_ok("sess_idem").expect(1).mount(&server).await; + + let c = make_client(&server); + let s = c.new_session(SessionOptions::default()).await.unwrap(); + let id = s.id().to_string(); + s.close().await.unwrap(); + + // Second close-equivalent: rebuild a Session-shape via reconstructing the + // closed state would require private constructors. Instead, drive the + // semantic check via close_session_internal contract: a fresh Session from + // a *new* create that we close twice. But since `close` consumes self, + // "second close" semantically means a Drop after an explicit close — and + // that path is covered by `test_drop_after_explicit_close_no_double_call`. + // + // What this test asserts is the wiremock `expect(1)` on the DELETE: one + // close => one DELETE => idempotency at the network layer holds. + let _ = id; +} + +#[tokio::test] +async fn test_drop_fires_async_close() { + let server = MockServer::start().await; + mock_create("sess_drop").mount(&server).await; + mock_delete_ok("sess_drop").expect(1).mount(&server).await; + + let c = make_client(&server); + { + let _s = c.new_session(SessionOptions::default()).await.unwrap(); + // _s drops here — Drop spawns the async close. + } + // Yield repeatedly so the spawned future has a chance to run + the HTTP + // request lands at wiremock. wiremock asserts `expect(1)` on Drop of the + // server. + for _ in 0..50 { + tokio::task::yield_now().await; + } + tokio::time::sleep(Duration::from_millis(200)).await; +} + +#[tokio::test] +async fn test_drop_after_explicit_close_no_double_call() { + let server = MockServer::start().await; + mock_create("sess_once").mount(&server).await; + // Exactly ONE delete — explicit close fires it, Drop should short-circuit. + mock_delete_ok("sess_once").expect(1).mount(&server).await; + + let c = make_client(&server); + let s = c.new_session(SessionOptions::default()).await.unwrap(); + s.close().await.unwrap(); + // Yield to give any erroneous spawn a chance to land — if Drop spawned a + // second DELETE, wiremock's expect(1) would fail. + for _ in 0..20 { + tokio::task::yield_now().await; + } + tokio::time::sleep(Duration::from_millis(100)).await; +} + +#[tokio::test] +async fn test_list_sessions() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/sessions")) + .and(header("authorization", "Bearer cf_test_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "sessions": [ + { + "session_id": "sess_a", + "agent": "claude", + "app_name": "cauldron", + "created_at": 1_700_000_000_i64, + "last_turn_at": 1_700_000_500_i64, + "turn_count": 3, + "closed_at": null + }, + { + "session_id": "sess_b", + "agent": "claude", + "app_name": "cauldron", + "created_at": 1_700_000_100_i64, + "last_turn_at": null, + "turn_count": 0, + "closed_at": 1_700_001_000_i64 + } + ] + }))) + .mount(&server) + .await; + + let c = make_client(&server); + let list = c.list_sessions().await.unwrap(); + assert_eq!(list.sessions.len(), 2); + assert_eq!(list.sessions[0].session_id, "sess_a"); + assert_eq!(list.sessions[0].turn_count, 3); + assert_eq!(list.sessions[0].last_turn_at, Some(1_700_000_500)); + assert_eq!(list.sessions[1].closed_at, Some(1_700_001_000)); +} + +#[tokio::test] +async fn test_get_session() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/sessions/sess_q")) + .and(header("authorization", "Bearer cf_test_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "session_id": "sess_q", + "agent": "claude", + "app_name": "cauldron", + "created_at": 1_700_000_000_i64, + "last_turn_at": 1_700_000_900_i64, + "turn_count": 7, + "closed_at": null + }))) + .mount(&server) + .await; + + let c = make_client(&server); + let st = c.get_session("sess_q").await.unwrap(); + assert_eq!(st.session_id, "sess_q"); + assert_eq!(st.agent, "claude"); + assert_eq!(st.app_name, "cauldron"); + assert_eq!(st.turn_count, 7); + assert!(st.closed_at.is_none()); +} + +#[tokio::test] +async fn test_cross_token_404() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path_regex(r"^/sessions/sess_other")) + .respond_with(ResponseTemplate::new(404).set_body_json(json!({ + "detail": "session not found" + }))) + .mount(&server) + .await; + + let c = make_client(&server); + let err = c + .get_session("sess_other") + .await + .expect_err("cross-token must 404"); + match err { + Error::Api { status, body } => { + assert_eq!(status, 404); + assert!(body.contains("session not found"), "body was {body}"); + } + other => panic!("expected Error::Api {{ status: 404, .. }}, got {other:?}"), + } +} + +#[tokio::test] +async fn test_turn_result_text_concat() { + let r = TurnResult { + ok: true, + session_id: "sess".into(), + turn_index: 1, + events: vec![ + TurnEvent { + event_type: "thinking".into(), + content: Some("ignored".into()), + name: None, + args: None, + result: None, + }, + TurnEvent { + event_type: "text".into(), + content: Some("hello ".into()), + name: None, + args: None, + result: None, + }, + TurnEvent { + event_type: "tool_call".into(), + content: None, + name: Some("Read".into()), + args: Some(json!({"path": "/x"})), + result: Some(json!({"ok": true})), + }, + TurnEvent { + event_type: "text".into(), + content: Some("world".into()), + name: None, + args: None, + result: None, + }, + TurnEvent { + // text event with None content — must contribute the empty + // string, not panic, not stringify "None". + event_type: "text".into(), + content: None, + name: None, + args: None, + result: None, + }, + ], + stop_reason: "end_turn".into(), + duration_ms: 99, + }; + assert_eq!(r.text(), "hello world"); +} + +#[tokio::test] +async fn test_session_debug_does_not_leak_token() { + let server = MockServer::start().await; + let secret = "cf_super_secret_session_bearer"; + Mock::given(method("POST")) + .and(path("/sessions")) + .and(header("authorization", format!("Bearer {secret}").as_str())) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "session_id": "sess_dbg", + "agent": "claude", + "created_at": 1_700_000_000_i64, + }))) + .mount(&server) + .await; + Mock::given(method("DELETE")) + .and(path("/sessions/sess_dbg")) + .and(header("authorization", format!("Bearer {secret}").as_str())) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true}))) + .mount(&server) + .await; + + let c = Client::builder() + .base_url(server.uri()) + .token(secret) + .build() + .unwrap(); + let s = c.new_session(SessionOptions::default()).await.unwrap(); + let dbg = format!("{s:?}"); + assert!( + !dbg.contains(secret), + "token leaked through Session Debug: {dbg}" + ); + // The Session Debug should print these visible bits. + assert!(dbg.contains("sess_dbg"), "session_id missing: {dbg}"); + assert!(dbg.contains("agent"), "agent field missing: {dbg}"); + assert!(dbg.contains("closed"), "closed field missing: {dbg}"); + s.close().await.unwrap(); +} + +/// Regression: `new_session` with options serializes the `agent` and `meta` +/// fields when set, omits them when None. +#[tokio::test] +async fn test_new_session_options_serialize() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/sessions")) + .and(body_json(json!({ + "agent": "claude", + "meta": {"trace_id": "t-123"} + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "session_id": "sess_opts", + "agent": "claude", + "created_at": 1 + }))) + .expect(1) + .mount(&server) + .await; + Mock::given(method("DELETE")) + .and(path("/sessions/sess_opts")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true}))) + .mount(&server) + .await; + + let c = make_client(&server); + let s = c + .new_session(SessionOptions { + agent: Some("claude".into()), + meta: Some(json!({"trace_id": "t-123"})), + }) + .await + .unwrap(); + assert_eq!(s.id(), "sess_opts"); + s.close().await.unwrap(); +} + +/// `get_session` / `close` / `turn` must reject path-traversal session ids +/// before issuing any HTTP request — same defense-in-depth pattern as +/// `revoke_token`. +#[tokio::test] +async fn test_session_id_rejects_path_traversal() { + let server = MockServer::start().await; + let c = make_client(&server); + for bad in ["", "../foo", "..", "foo/bar", "foo?x=1", "foo#frag"] { + let err = c + .get_session(bad) + .await + .expect_err(&format!("get_session({bad:?}) should reject")); + assert!( + matches!(err, Error::Config(_)), + "{bad:?} produced wrong variant: {err:?}" + ); + } +}