clients/rust: v0.2 multi-turn Session API
- Session with consume-self close() (compile-time use-after-close)
- AtomicBool flag + Drop best-effort async close via tokio::spawn (logged on failure)
- Client::new_session / list_sessions / get_session
- TurnResult.text() helper, hand-written Debug to avoid bearer leak
- tests/sessions.rs: 12 tests covering new/close/idempotent/drop/list/state/404/text/debug-redaction
- README "Multi-turn / Sessions (v0.2)" section
v0.1 run path unchanged.
Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
This commit is contained in:
parent
3a590d775e
commit
e0f161f18c
6 changed files with 917 additions and 1 deletions
|
|
@ -14,8 +14,9 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "mul
|
|||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
thiserror = "1"
|
||||
tokio = { version = "1", features = ["fs", "io-util"] }
|
||||
tokio = { version = "1", features = ["fs", "io-util", "rt"] }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
tracing = { version = "0.1", default-features = false, features = ["std"] }
|
||||
url = "2"
|
||||
bytes = "1"
|
||||
|
||||
|
|
|
|||
|
|
@ -76,6 +76,94 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
}
|
||||
```
|
||||
|
||||
## Multi-turn / Sessions (v0.2)
|
||||
|
||||
v0.1 `Client::run` is a single-turn shot. v0.2 adds a parallel session API
|
||||
backed by the server's [ACPX]-driven `/sessions/*` surface for back-and-forth
|
||||
agent flows that need context across turns.
|
||||
|
||||
[ACPX]: https://github.com/openclaw/acpx
|
||||
|
||||
```rust
|
||||
use clawdforge::{Client, SessionOptions};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), clawdforge::Error> {
|
||||
let client = Client::builder()
|
||||
.base_url("http://localhost:8800")
|
||||
.token("cf_xxxxxxxxxxxxxxxx")
|
||||
.build()?;
|
||||
|
||||
let mut s = client.new_session(SessionOptions::default()).await?;
|
||||
|
||||
let r1 = s.turn("Read README.md and summarize it").await?;
|
||||
println!("{}", r1.text());
|
||||
|
||||
// Attach files uploaded via Client::upload_file.
|
||||
let r2 = s
|
||||
.turn_with_files(
|
||||
"Now look at the auth flow",
|
||||
&["ff_xyz".into()],
|
||||
)
|
||||
.await?;
|
||||
println!("turn {}: {}", r2.turn_index, r2.text());
|
||||
|
||||
// Explicit close consumes `s` — using it after this is a compile error.
|
||||
s.close().await?;
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
### Lifecycle
|
||||
|
||||
| API | Purpose |
|
||||
|---|---|
|
||||
| `Client::new_session(SessionOptions)` | `POST /sessions` — returns a `Session`. |
|
||||
| `Session::turn(prompt)` | `POST /sessions/{id}/turn` with no files. |
|
||||
| `Session::turn_with_files(prompt, &[token, ...])` | `POST /sessions/{id}/turn` with `ff_*` tokens from `upload_file`. |
|
||||
| `Session::close(self)` | `DELETE /sessions/{id}`. **Consumes `self`** — use-after-close is a compile error. |
|
||||
| `Client::list_sessions()` | `GET /sessions` — sessions visible to the calling token. |
|
||||
| `Client::get_session(id)` | `GET /sessions/{id}` — current state. |
|
||||
|
||||
### Drop fallback
|
||||
|
||||
If a `Session` is dropped without an explicit `close().await?`, `Drop`
|
||||
spawns a best-effort async DELETE via `tokio::spawn` to release the
|
||||
server-side session. This is **best-effort**:
|
||||
|
||||
- The spawned future is not awaited — the calling task continues immediately.
|
||||
- Failures are logged via `tracing::warn!` (target `clawdforge::session`),
|
||||
not panicked.
|
||||
- If `Session` is dropped outside any tokio runtime, the close is skipped
|
||||
with a warning rather than panicking on `tokio::spawn`.
|
||||
- If `close().await?` already ran, `Drop` short-circuits without a second
|
||||
network call (an `AtomicBool` flag tracks closed state).
|
||||
|
||||
For deterministic cleanup, **prefer `s.close().await?`**. The Drop path is a
|
||||
backstop for panics / early returns, not a primary lifecycle hook.
|
||||
|
||||
### `TurnResult::text()`
|
||||
|
||||
Concatenates all `"text"` events into one string. `"thinking"` and
|
||||
`"tool_call"` events are skipped — inspect `result.events` directly if you
|
||||
need them.
|
||||
|
||||
```rust
|
||||
let r = s.turn("hi").await?;
|
||||
let answer: String = r.text();
|
||||
let n_tool_calls = r
|
||||
.events
|
||||
.iter()
|
||||
.filter(|e| e.event_type == "tool_call")
|
||||
.count();
|
||||
```
|
||||
|
||||
### v0.1 compatibility
|
||||
|
||||
The v0.1 surface (`Client::run`, `Client::upload_file`,
|
||||
`Client::create_token`, etc.) is byte-identical. v0.2 is purely additive. v0.1
|
||||
callers do not need to change anything to upgrade.
|
||||
|
||||
## Public API
|
||||
|
||||
### `Client::builder()`
|
||||
|
|
@ -102,6 +190,9 @@ Builder for the HTTP client.
|
|||
| `create_token(TokenCreateRequest)` | `POST /admin/tokens` | Admin only. Returns `AppToken`. |
|
||||
| `list_tokens()` | `GET /admin/tokens` | Admin only. Returns `TokenList`. |
|
||||
| `revoke_token(name)` | `DELETE /admin/tokens/{name}` | Admin only. |
|
||||
| `new_session(opts)` | `POST /sessions` | v0.2. Returns `Session`. |
|
||||
| `list_sessions()` | `GET /sessions` | v0.2. Returns `SessionList`. |
|
||||
| `get_session(id)` | `GET /sessions/{id}` | v0.2. Returns `SessionState`. |
|
||||
|
||||
### `RunResult` helpers
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,10 @@ use tokio_util::io::ReaderStream;
|
|||
use url::Url;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::session::{
|
||||
Session, SessionCloseResponse, SessionCreateResponse, SessionList, SessionOptions,
|
||||
SessionState, TurnResult,
|
||||
};
|
||||
use crate::types::{
|
||||
AppToken, FileToken, Healthz, RunRequest, RunResult, TokenCreateRequest, TokenList,
|
||||
};
|
||||
|
|
@ -250,8 +254,105 @@ impl Client {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// ---- v0.2 multi-turn / sessions ---------------------------------------
|
||||
|
||||
/// `POST /sessions`. Create a new multi-turn session and return a
|
||||
/// [`Session`] handle bound to this client.
|
||||
///
|
||||
/// The handle owns a clone of the client; dropping it without an explicit
|
||||
/// `Session::close().await?` triggers a best-effort async DELETE via
|
||||
/// `tokio::spawn`. See [`Session`] for the full lifecycle contract.
|
||||
pub async fn new_session(&self, opts: SessionOptions) -> Result<Session, Error> {
|
||||
let token = self.require_app()?;
|
||||
let url = self.url("/sessions")?;
|
||||
let resp = self
|
||||
.inner
|
||||
.post(url)
|
||||
.bearer_auth(token)
|
||||
.json(&opts)
|
||||
.send()
|
||||
.await?;
|
||||
let created: SessionCreateResponse = json_or_error(resp).await?;
|
||||
Ok(Session {
|
||||
client: self.clone(),
|
||||
session_id: created.session_id,
|
||||
agent: created.agent,
|
||||
created_at: created.created_at,
|
||||
closed: std::sync::atomic::AtomicBool::new(false),
|
||||
})
|
||||
}
|
||||
|
||||
/// `GET /sessions`. List all sessions visible to the calling app token.
|
||||
pub async fn list_sessions(&self) -> Result<SessionList, Error> {
|
||||
let token = self.require_app()?;
|
||||
let url = self.url("/sessions")?;
|
||||
let resp = self.inner.get(url).bearer_auth(token).send().await?;
|
||||
json_or_error(resp).await
|
||||
}
|
||||
|
||||
/// `GET /sessions/{id}`. Fetch the current state of a session.
|
||||
///
|
||||
/// `id` is validated client-side against the same path-traversal guard as
|
||||
/// [`Self::revoke_token`] — anything containing `/`, `?`, `#`, `..`, or
|
||||
/// empty short-circuits with [`Error::Config`].
|
||||
pub async fn get_session(&self, id: &str) -> Result<SessionState, Error> {
|
||||
validate_session_id(id)?;
|
||||
let token = self.require_app()?;
|
||||
let url = self.url(&format!("/sessions/{id}"))?;
|
||||
let resp = self.inner.get(url).bearer_auth(token).send().await?;
|
||||
json_or_error(resp).await
|
||||
}
|
||||
|
||||
/// Internal helper used by both [`Session::close`] and [`Session`]'s
|
||||
/// `Drop` impl to issue `DELETE /sessions/{id}`.
|
||||
pub(crate) async fn close_session_internal(&self, id: &str) -> Result<(), Error> {
|
||||
validate_session_id(id)?;
|
||||
let token = self.require_app()?;
|
||||
let url = self.url(&format!("/sessions/{id}"))?;
|
||||
let resp = self.inner.delete(url).bearer_auth(token).send().await?;
|
||||
if resp.status().is_success() {
|
||||
// Body is informational — `{ ok, already_closed? }`. Drain and
|
||||
// ignore decode failure (server may legitimately 204).
|
||||
let bytes = resp.bytes().await?;
|
||||
if !bytes.is_empty() {
|
||||
let _ = serde_json::from_slice::<SessionCloseResponse>(&bytes);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
// Funnel non-2xx through json_or_error for Auth/Api mapping.
|
||||
let _: serde_json::Value = json_or_error(resp).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Internal helper used by [`Session::turn`] /
|
||||
/// [`Session::turn_with_files`] to dispatch to
|
||||
/// `POST /sessions/{id}/turn`.
|
||||
pub(crate) async fn turn_internal<B: serde::Serialize>(
|
||||
&self,
|
||||
id: &str,
|
||||
body: &B,
|
||||
) -> Result<TurnResult, Error> {
|
||||
validate_session_id(id)?;
|
||||
let token = self.require_app()?;
|
||||
let url = self.url(&format!("/sessions/{id}/turn"))?;
|
||||
let resp = self
|
||||
.inner
|
||||
.post(url)
|
||||
.bearer_auth(token)
|
||||
.json(body)
|
||||
.send()
|
||||
.await?;
|
||||
json_or_error(resp).await
|
||||
}
|
||||
|
||||
// ---- internal ----------------------------------------------------------
|
||||
|
||||
fn require_app(&self) -> Result<&str, Error> {
|
||||
self.app_token
|
||||
.as_deref()
|
||||
.ok_or_else(|| Error::Auth("no app token configured".into()))
|
||||
}
|
||||
|
||||
fn require_admin(&self) -> Result<&str, Error> {
|
||||
self.admin_token
|
||||
.as_deref()
|
||||
|
|
@ -385,6 +486,21 @@ async fn json_or_error<T: DeserializeOwned>(resp: Response) -> Result<T, Error>
|
|||
}
|
||||
}
|
||||
|
||||
/// Defense-in-depth path validator for session ids in `/sessions/{id}*`. Same
|
||||
/// shape as [`Client::revoke_token`]'s name guard — RFC 3986 dot-segment
|
||||
/// resolution inside `Url::join` is the threat model here.
|
||||
fn validate_session_id(id: &str) -> Result<(), Error> {
|
||||
if id.is_empty()
|
||||
|| id.contains('/')
|
||||
|| id.contains('?')
|
||||
|| id.contains('#')
|
||||
|| id.contains("..")
|
||||
{
|
||||
return Err(Error::Config(format!("invalid session id: {id:?}")));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Truncate `s` to at most `max` bytes, snapping down to the nearest UTF-8
|
||||
/// codepoint boundary so we never panic on multibyte sequences. Appends `…`
|
||||
/// to the truncated form. `str::floor_char_boundary` (stable 1.80+) does the
|
||||
|
|
|
|||
|
|
@ -46,10 +46,12 @@
|
|||
|
||||
mod client;
|
||||
mod error;
|
||||
pub mod session;
|
||||
pub mod types;
|
||||
|
||||
pub use client::{Client, ClientBuilder};
|
||||
pub use error::Error;
|
||||
pub use session::{Session, SessionList, SessionOptions, SessionState, TurnEvent, TurnResult};
|
||||
pub use types::{
|
||||
AppToken, AppTokenInfo, FileToken, Healthz, RunFailure, RunRequest, RunResult,
|
||||
TokenCreateRequest, TokenList,
|
||||
|
|
|
|||
281
clients/rust/src/session.rs
Normal file
281
clients/rust/src/session.rs
Normal file
|
|
@ -0,0 +1,281 @@
|
|||
//! Multi-turn session API (v0.2).
|
||||
//!
|
||||
//! v0.2 adds a parallel `/sessions/*` surface to clawdforge backed by ACPX. A
|
||||
//! [`Session`] is a handle to one server-side session; [`Session::turn`]
|
||||
//! dispatches a single prompt+files turn and returns the structured event
|
||||
//! batch. Sessions are explicitly closed via [`Session::close`] (consumes the
|
||||
//! handle, preventing use-after-close at compile time) or — as a last-resort
|
||||
//! fallback — best-effort closed by [`Drop`] via `tokio::spawn`.
|
||||
//!
|
||||
//! v0.1 single-turn `Client::run` is unchanged; the v0.2 surface is purely
|
||||
//! additive.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::client::Client;
|
||||
use crate::error::Error;
|
||||
|
||||
/// Options passed to [`Client::new_session`].
|
||||
///
|
||||
/// `Default` produces `agent = None` (server picks `"claude"`) and `meta =
|
||||
/// None`.
|
||||
#[derive(Debug, Default, Clone, Serialize)]
|
||||
pub struct SessionOptions {
|
||||
/// Agent slug to dispatch to. `None` falls back to the server-side default
|
||||
/// (`"claude"`).
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub agent: Option<String>,
|
||||
|
||||
/// Free-form metadata stored alongside the session ledger row.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub meta: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Reply body from `POST /sessions`.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub(crate) struct SessionCreateResponse {
|
||||
pub session_id: String,
|
||||
pub agent: String,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
/// One event in a turn's structured output.
|
||||
///
|
||||
/// `event_type` is one of `"thinking"`, `"text"`, `"tool_call"`, etc. (server
|
||||
/// is the authority on the set).
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TurnEvent {
|
||||
/// Event discriminator (`"text"`, `"thinking"`, `"tool_call"`, ...).
|
||||
#[serde(rename = "type")]
|
||||
pub event_type: String,
|
||||
/// Text content for `"text"` and `"thinking"` events.
|
||||
#[serde(default)]
|
||||
pub content: Option<String>,
|
||||
/// Tool name for `"tool_call"` events.
|
||||
#[serde(default)]
|
||||
pub name: Option<String>,
|
||||
/// Tool arguments for `"tool_call"` events.
|
||||
#[serde(default)]
|
||||
pub args: Option<serde_json::Value>,
|
||||
/// Tool result for `"tool_call"` events.
|
||||
#[serde(default)]
|
||||
pub result: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Successful response body from `POST /sessions/{id}/turn`.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct TurnResult {
|
||||
/// Always `true` on a 200 reply.
|
||||
pub ok: bool,
|
||||
/// The session this turn belongs to.
|
||||
pub session_id: String,
|
||||
/// 1-based index of this turn within the session.
|
||||
pub turn_index: i32,
|
||||
/// Structured events emitted during the turn.
|
||||
pub events: Vec<TurnEvent>,
|
||||
/// Reason the agent stopped (`"end_turn"`, `"max_tokens"`, ...).
|
||||
pub stop_reason: String,
|
||||
/// Wall-clock duration of the turn.
|
||||
pub duration_ms: i64,
|
||||
}
|
||||
|
||||
impl TurnResult {
|
||||
/// Concatenate all `"text"` event contents into a single string.
|
||||
///
|
||||
/// Non-text events (`thinking`, `tool_call`, ...) are skipped. If an event
|
||||
/// is `"text"` but `content` is `None`, it contributes the empty string.
|
||||
pub fn text(&self) -> String {
|
||||
let mut out = String::new();
|
||||
for ev in &self.events {
|
||||
if ev.event_type == "text" {
|
||||
if let Some(c) = ev.content.as_deref() {
|
||||
out.push_str(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
/// Reply body from `GET /sessions/{id}` and entries in `GET /sessions`.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SessionState {
|
||||
/// Server-issued session id.
|
||||
pub session_id: String,
|
||||
/// Agent slug bound to the session.
|
||||
pub agent: String,
|
||||
/// App / consumer name that owns the session.
|
||||
pub app_name: String,
|
||||
/// Unix epoch seconds when created.
|
||||
pub created_at: i64,
|
||||
/// Unix epoch seconds of the last successful turn (or `None` if zero turns
|
||||
/// have been dispatched yet).
|
||||
#[serde(default)]
|
||||
pub last_turn_at: Option<i64>,
|
||||
/// Number of turns dispatched.
|
||||
pub turn_count: i32,
|
||||
/// Unix epoch seconds when closed (or `None` if still open).
|
||||
#[serde(default)]
|
||||
pub closed_at: Option<i64>,
|
||||
}
|
||||
|
||||
/// Reply body from `DELETE /sessions/{id}`.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub(crate) struct SessionCloseResponse {
|
||||
#[allow(dead_code)]
|
||||
pub ok: bool,
|
||||
#[serde(default)]
|
||||
#[allow(dead_code)]
|
||||
pub already_closed: Option<bool>,
|
||||
}
|
||||
|
||||
/// Reply body from `GET /sessions`.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SessionList {
|
||||
/// All sessions visible to the calling token.
|
||||
pub sessions: Vec<SessionState>,
|
||||
}
|
||||
|
||||
/// Request body for `POST /sessions/{id}/turn`.
|
||||
#[derive(Debug, Serialize)]
|
||||
struct TurnRequest<'a> {
|
||||
prompt: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
files: Option<&'a [String]>,
|
||||
}
|
||||
|
||||
/// A handle to one server-side multi-turn session.
|
||||
///
|
||||
/// Construct via [`Client::new_session`]. Drop or [`Session::close`] to
|
||||
/// release the server-side session. `close` consumes the value so use-after-
|
||||
/// close is a compile error; `Drop` is a best-effort backstop that fires an
|
||||
/// async DELETE via `tokio::spawn` and logs (does not panic) on failure.
|
||||
///
|
||||
/// `Debug` is hand-written and explicitly excludes the embedded [`Client`] so
|
||||
/// no bearer can leak through `{:?}` formatting.
|
||||
pub struct Session {
|
||||
pub(crate) client: Client,
|
||||
pub(crate) session_id: String,
|
||||
pub(crate) agent: String,
|
||||
pub(crate) created_at: i64,
|
||||
pub(crate) closed: AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Session {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// Deliberately omit `client` — its Debug already redacts tokens, but
|
||||
// the spec-mandated shape is `Session { session_id, agent, closed }`
|
||||
// to keep the surface minimal and audit-friendly.
|
||||
f.debug_struct("Session")
|
||||
.field("session_id", &self.session_id)
|
||||
.field("agent", &self.agent)
|
||||
.field("created_at", &self.created_at)
|
||||
.field("closed", &self.closed.load(Ordering::Acquire))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
/// Server-issued session id.
|
||||
pub fn id(&self) -> &str {
|
||||
&self.session_id
|
||||
}
|
||||
|
||||
/// Agent slug the server bound to this session.
|
||||
pub fn agent(&self) -> &str {
|
||||
&self.agent
|
||||
}
|
||||
|
||||
/// Unix epoch seconds when the session was created server-side.
|
||||
pub fn created_at(&self) -> i64 {
|
||||
self.created_at
|
||||
}
|
||||
|
||||
/// Whether the session has been explicitly closed already. Sessions closed
|
||||
/// only via `Drop`'s spawn are still reported as closed once that future
|
||||
/// has run; this getter reflects the in-memory flag.
|
||||
pub fn is_closed(&self) -> bool {
|
||||
self.closed.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// Send a turn with no attached files.
|
||||
///
|
||||
/// Equivalent to `turn_with_files(prompt, &[])` but skips serializing the
|
||||
/// `files` field on the wire.
|
||||
pub async fn turn(&mut self, prompt: impl Into<String>) -> Result<TurnResult, Error> {
|
||||
self.dispatch_turn(prompt.into(), None).await
|
||||
}
|
||||
|
||||
/// Send a turn that references previously uploaded file tokens.
|
||||
///
|
||||
/// `files` is the list of `ff_*` tokens returned by [`Client::upload_file`].
|
||||
///
|
||||
/// [`Client::upload_file`]: crate::Client::upload_file
|
||||
pub async fn turn_with_files(
|
||||
&mut self,
|
||||
prompt: impl Into<String>,
|
||||
files: &[String],
|
||||
) -> Result<TurnResult, Error> {
|
||||
self.dispatch_turn(prompt.into(), Some(files)).await
|
||||
}
|
||||
|
||||
async fn dispatch_turn(
|
||||
&mut self,
|
||||
prompt: String,
|
||||
files: Option<&[String]>,
|
||||
) -> Result<TurnResult, Error> {
|
||||
if self.closed.load(Ordering::Acquire) {
|
||||
return Err(Error::Config("session is closed".into()));
|
||||
}
|
||||
self.client
|
||||
.turn_internal(&self.session_id, &TurnRequest { prompt, files })
|
||||
.await
|
||||
}
|
||||
|
||||
/// Explicitly close the session. Consumes `self` — use-after-close is a
|
||||
/// compile error.
|
||||
///
|
||||
/// If the session is already closed in memory (e.g. via a prior failed
|
||||
/// close or a prior dispatch path that flagged it), this short-circuits
|
||||
/// without contacting the server.
|
||||
pub async fn close(self) -> Result<(), Error> {
|
||||
// Mark closed before the network call so a panic-mid-await on the
|
||||
// request future cannot trigger Drop's spawn into a double-close.
|
||||
if self.closed.swap(true, Ordering::AcqRel) {
|
||||
return Ok(());
|
||||
}
|
||||
self.client.close_session_internal(&self.session_id).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Session {
|
||||
fn drop(&mut self) {
|
||||
// If close() already ran, nothing to do.
|
||||
if self.closed.swap(true, Ordering::AcqRel) {
|
||||
return;
|
||||
}
|
||||
// tokio::spawn panics if no runtime is current. Guard against being
|
||||
// dropped from a sync context (e.g. a forgotten value at the end of a
|
||||
// sync `main`).
|
||||
if tokio::runtime::Handle::try_current().is_err() {
|
||||
tracing::warn!(
|
||||
session_id = %self.session_id,
|
||||
"Session dropped outside a tokio runtime; server-side session not closed"
|
||||
);
|
||||
return;
|
||||
}
|
||||
let client = self.client.clone();
|
||||
let id = self.session_id.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = client.close_session_internal(&id).await {
|
||||
tracing::warn!(
|
||||
session_id = %id,
|
||||
error = %e,
|
||||
"best-effort drop close failed"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
425
clients/rust/tests/sessions.rs
Normal file
425
clients/rust/tests/sessions.rs
Normal file
|
|
@ -0,0 +1,425 @@
|
|||
//! Integration tests for the v0.2 multi-turn Session API.
|
||||
//!
|
||||
//! All tests run against an in-process `wiremock` server — no live clawdforge
|
||||
//! required.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use clawdforge::{Client, Error, SessionOptions, TurnEvent, TurnResult};
|
||||
use serde_json::json;
|
||||
use wiremock::matchers::{body_json, header, method, path, path_regex};
|
||||
use wiremock::{Mock, MockServer, ResponseTemplate};
|
||||
|
||||
fn make_client(server: &MockServer) -> Client {
|
||||
Client::builder()
|
||||
.base_url(server.uri())
|
||||
.token("cf_test_token")
|
||||
.timeout(Duration::from_secs(5))
|
||||
.build()
|
||||
.expect("client builds")
|
||||
}
|
||||
|
||||
fn mock_create(session_id: &str) -> Mock {
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/sessions"))
|
||||
.and(header("authorization", "Bearer cf_test_token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"session_id": session_id,
|
||||
"agent": "claude",
|
||||
"created_at": 1_700_000_000_i64,
|
||||
})))
|
||||
}
|
||||
|
||||
fn mock_delete_ok(session_id: &str) -> Mock {
|
||||
Mock::given(method("DELETE"))
|
||||
.and(path(format!("/sessions/{session_id}")))
|
||||
.and(header("authorization", "Bearer cf_test_token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true})))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_new_session_and_close() {
|
||||
let server = MockServer::start().await;
|
||||
mock_create("sess_abc").expect(1).mount(&server).await;
|
||||
mock_delete_ok("sess_abc").expect(1).mount(&server).await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let s = c
|
||||
.new_session(SessionOptions::default())
|
||||
.await
|
||||
.expect("new_session");
|
||||
assert_eq!(s.id(), "sess_abc");
|
||||
assert_eq!(s.agent(), "claude");
|
||||
assert_eq!(s.created_at(), 1_700_000_000);
|
||||
assert!(!s.is_closed());
|
||||
|
||||
s.close().await.expect("close");
|
||||
// wiremock verifies expectations on Drop of the server.
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_turn_round_trip() {
|
||||
let server = MockServer::start().await;
|
||||
mock_create("sess_t1").mount(&server).await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/sessions/sess_t1/turn"))
|
||||
.and(header("authorization", "Bearer cf_test_token"))
|
||||
.and(body_json(json!({"prompt": "hello"})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"ok": true,
|
||||
"session_id": "sess_t1",
|
||||
"turn_index": 1,
|
||||
"events": [
|
||||
{"type": "thinking", "content": "..."},
|
||||
{"type": "text", "content": "hi back"}
|
||||
],
|
||||
"stop_reason": "end_turn",
|
||||
"duration_ms": 250
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
// Allow drop-close to land without failing other assertions.
|
||||
Mock::given(method("DELETE"))
|
||||
.and(path("/sessions/sess_t1"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let mut s = c.new_session(SessionOptions::default()).await.unwrap();
|
||||
let r: TurnResult = s.turn("hello").await.unwrap();
|
||||
assert!(r.ok);
|
||||
assert_eq!(r.session_id, "sess_t1");
|
||||
assert_eq!(r.turn_index, 1);
|
||||
assert_eq!(r.events.len(), 2);
|
||||
assert_eq!(r.stop_reason, "end_turn");
|
||||
assert_eq!(r.duration_ms, 250);
|
||||
assert_eq!(r.text(), "hi back");
|
||||
|
||||
// Drive the turn_with_files path too.
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/sessions/sess_t1/turn"))
|
||||
.and(body_json(
|
||||
json!({"prompt": "next", "files": ["ff_one", "ff_two"]}),
|
||||
))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"ok": true,
|
||||
"session_id": "sess_t1",
|
||||
"turn_index": 2,
|
||||
"events": [{"type": "text", "content": "ok"}],
|
||||
"stop_reason": "end_turn",
|
||||
"duration_ms": 10
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
let r2 = s
|
||||
.turn_with_files("next", &["ff_one".into(), "ff_two".into()])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(r2.turn_index, 2);
|
||||
assert_eq!(r2.text(), "ok");
|
||||
|
||||
s.close().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_idempotent_short_circuits() {
|
||||
let server = MockServer::start().await;
|
||||
mock_create("sess_idem").mount(&server).await;
|
||||
// Expect EXACTLY ONE delete — second close() is in-memory.
|
||||
mock_delete_ok("sess_idem").expect(1).mount(&server).await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let s = c.new_session(SessionOptions::default()).await.unwrap();
|
||||
let id = s.id().to_string();
|
||||
s.close().await.unwrap();
|
||||
|
||||
// Second close-equivalent: rebuild a Session-shape via reconstructing the
|
||||
// closed state would require private constructors. Instead, drive the
|
||||
// semantic check via close_session_internal contract: a fresh Session from
|
||||
// a *new* create that we close twice. But since `close` consumes self,
|
||||
// "second close" semantically means a Drop after an explicit close — and
|
||||
// that path is covered by `test_drop_after_explicit_close_no_double_call`.
|
||||
//
|
||||
// What this test asserts is the wiremock `expect(1)` on the DELETE: one
|
||||
// close => one DELETE => idempotency at the network layer holds.
|
||||
let _ = id;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_fires_async_close() {
|
||||
let server = MockServer::start().await;
|
||||
mock_create("sess_drop").mount(&server).await;
|
||||
mock_delete_ok("sess_drop").expect(1).mount(&server).await;
|
||||
|
||||
let c = make_client(&server);
|
||||
{
|
||||
let _s = c.new_session(SessionOptions::default()).await.unwrap();
|
||||
// _s drops here — Drop spawns the async close.
|
||||
}
|
||||
// Yield repeatedly so the spawned future has a chance to run + the HTTP
|
||||
// request lands at wiremock. wiremock asserts `expect(1)` on Drop of the
|
||||
// server.
|
||||
for _ in 0..50 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_after_explicit_close_no_double_call() {
|
||||
let server = MockServer::start().await;
|
||||
mock_create("sess_once").mount(&server).await;
|
||||
// Exactly ONE delete — explicit close fires it, Drop should short-circuit.
|
||||
mock_delete_ok("sess_once").expect(1).mount(&server).await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let s = c.new_session(SessionOptions::default()).await.unwrap();
|
||||
s.close().await.unwrap();
|
||||
// Yield to give any erroneous spawn a chance to land — if Drop spawned a
|
||||
// second DELETE, wiremock's expect(1) would fail.
|
||||
for _ in 0..20 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_sessions() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/sessions"))
|
||||
.and(header("authorization", "Bearer cf_test_token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"sessions": [
|
||||
{
|
||||
"session_id": "sess_a",
|
||||
"agent": "claude",
|
||||
"app_name": "cauldron",
|
||||
"created_at": 1_700_000_000_i64,
|
||||
"last_turn_at": 1_700_000_500_i64,
|
||||
"turn_count": 3,
|
||||
"closed_at": null
|
||||
},
|
||||
{
|
||||
"session_id": "sess_b",
|
||||
"agent": "claude",
|
||||
"app_name": "cauldron",
|
||||
"created_at": 1_700_000_100_i64,
|
||||
"last_turn_at": null,
|
||||
"turn_count": 0,
|
||||
"closed_at": 1_700_001_000_i64
|
||||
}
|
||||
]
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let list = c.list_sessions().await.unwrap();
|
||||
assert_eq!(list.sessions.len(), 2);
|
||||
assert_eq!(list.sessions[0].session_id, "sess_a");
|
||||
assert_eq!(list.sessions[0].turn_count, 3);
|
||||
assert_eq!(list.sessions[0].last_turn_at, Some(1_700_000_500));
|
||||
assert_eq!(list.sessions[1].closed_at, Some(1_700_001_000));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_session() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/sessions/sess_q"))
|
||||
.and(header("authorization", "Bearer cf_test_token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"session_id": "sess_q",
|
||||
"agent": "claude",
|
||||
"app_name": "cauldron",
|
||||
"created_at": 1_700_000_000_i64,
|
||||
"last_turn_at": 1_700_000_900_i64,
|
||||
"turn_count": 7,
|
||||
"closed_at": null
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let st = c.get_session("sess_q").await.unwrap();
|
||||
assert_eq!(st.session_id, "sess_q");
|
||||
assert_eq!(st.agent, "claude");
|
||||
assert_eq!(st.app_name, "cauldron");
|
||||
assert_eq!(st.turn_count, 7);
|
||||
assert!(st.closed_at.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cross_token_404() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path_regex(r"^/sessions/sess_other"))
|
||||
.respond_with(ResponseTemplate::new(404).set_body_json(json!({
|
||||
"detail": "session not found"
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let err = c
|
||||
.get_session("sess_other")
|
||||
.await
|
||||
.expect_err("cross-token must 404");
|
||||
match err {
|
||||
Error::Api { status, body } => {
|
||||
assert_eq!(status, 404);
|
||||
assert!(body.contains("session not found"), "body was {body}");
|
||||
}
|
||||
other => panic!("expected Error::Api {{ status: 404, .. }}, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_turn_result_text_concat() {
|
||||
let r = TurnResult {
|
||||
ok: true,
|
||||
session_id: "sess".into(),
|
||||
turn_index: 1,
|
||||
events: vec![
|
||||
TurnEvent {
|
||||
event_type: "thinking".into(),
|
||||
content: Some("ignored".into()),
|
||||
name: None,
|
||||
args: None,
|
||||
result: None,
|
||||
},
|
||||
TurnEvent {
|
||||
event_type: "text".into(),
|
||||
content: Some("hello ".into()),
|
||||
name: None,
|
||||
args: None,
|
||||
result: None,
|
||||
},
|
||||
TurnEvent {
|
||||
event_type: "tool_call".into(),
|
||||
content: None,
|
||||
name: Some("Read".into()),
|
||||
args: Some(json!({"path": "/x"})),
|
||||
result: Some(json!({"ok": true})),
|
||||
},
|
||||
TurnEvent {
|
||||
event_type: "text".into(),
|
||||
content: Some("world".into()),
|
||||
name: None,
|
||||
args: None,
|
||||
result: None,
|
||||
},
|
||||
TurnEvent {
|
||||
// text event with None content — must contribute the empty
|
||||
// string, not panic, not stringify "None".
|
||||
event_type: "text".into(),
|
||||
content: None,
|
||||
name: None,
|
||||
args: None,
|
||||
result: None,
|
||||
},
|
||||
],
|
||||
stop_reason: "end_turn".into(),
|
||||
duration_ms: 99,
|
||||
};
|
||||
assert_eq!(r.text(), "hello world");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_session_debug_does_not_leak_token() {
|
||||
let server = MockServer::start().await;
|
||||
let secret = "cf_super_secret_session_bearer";
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/sessions"))
|
||||
.and(header("authorization", format!("Bearer {secret}").as_str()))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"session_id": "sess_dbg",
|
||||
"agent": "claude",
|
||||
"created_at": 1_700_000_000_i64,
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("DELETE"))
|
||||
.and(path("/sessions/sess_dbg"))
|
||||
.and(header("authorization", format!("Bearer {secret}").as_str()))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let c = Client::builder()
|
||||
.base_url(server.uri())
|
||||
.token(secret)
|
||||
.build()
|
||||
.unwrap();
|
||||
let s = c.new_session(SessionOptions::default()).await.unwrap();
|
||||
let dbg = format!("{s:?}");
|
||||
assert!(
|
||||
!dbg.contains(secret),
|
||||
"token leaked through Session Debug: {dbg}"
|
||||
);
|
||||
// The Session Debug should print these visible bits.
|
||||
assert!(dbg.contains("sess_dbg"), "session_id missing: {dbg}");
|
||||
assert!(dbg.contains("agent"), "agent field missing: {dbg}");
|
||||
assert!(dbg.contains("closed"), "closed field missing: {dbg}");
|
||||
s.close().await.unwrap();
|
||||
}
|
||||
|
||||
/// Regression: `new_session` with options serializes the `agent` and `meta`
|
||||
/// fields when set, omits them when None.
|
||||
#[tokio::test]
|
||||
async fn test_new_session_options_serialize() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/sessions"))
|
||||
.and(body_json(json!({
|
||||
"agent": "claude",
|
||||
"meta": {"trace_id": "t-123"}
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"session_id": "sess_opts",
|
||||
"agent": "claude",
|
||||
"created_at": 1
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("DELETE"))
|
||||
.and(path("/sessions/sess_opts"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"ok": true})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let c = make_client(&server);
|
||||
let s = c
|
||||
.new_session(SessionOptions {
|
||||
agent: Some("claude".into()),
|
||||
meta: Some(json!({"trace_id": "t-123"})),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(s.id(), "sess_opts");
|
||||
s.close().await.unwrap();
|
||||
}
|
||||
|
||||
/// `get_session` / `close` / `turn` must reject path-traversal session ids
|
||||
/// before issuing any HTTP request — same defense-in-depth pattern as
|
||||
/// `revoke_token`.
|
||||
#[tokio::test]
|
||||
async fn test_session_id_rejects_path_traversal() {
|
||||
let server = MockServer::start().await;
|
||||
let c = make_client(&server);
|
||||
for bad in ["", "../foo", "..", "foo/bar", "foo?x=1", "foo#frag"] {
|
||||
let err = c
|
||||
.get_session(bad)
|
||||
.await
|
||||
.expect_err(&format!("get_session({bad:?}) should reject"));
|
||||
assert!(
|
||||
matches!(err, Error::Config(_)),
|
||||
"{bad:?} produced wrong variant: {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue