From ef1d8fdd31a10bbdc4b57cbfa59f760f0eef09b2 Mon Sep 17 00:00:00 2001 From: Kayos Date: Thu, 14 May 2026 08:03:52 -0700 Subject: [PATCH] port query() and Client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two entry points mirroring the Python SDK's surface: - query(prompt, options) returns an impl Stream> that terminates after the CLI emits its terminal result message. The stream owns the underlying Client and tears down the subprocess via a spawned disconnect task either on Result observation or on Drop. - Client (mirror of ClaudeSDKClient) supports bidirectional multi-turn sessions: connect, send (or send_raw for tool-result frames), drain the messages stream, repeat. Drop is intentionally a no-op for the subprocess — callers should call disconnect() for a clean shutdown that surfaces non-zero exit codes as Error::Process. lib.rs re-exports the public API and carries the crate-level docs + quick-start example. The v0.1 / v0.2 split is documented inline: control protocol (interrupt, set_permission_mode, etc.), can_use_tool, in-process MCP servers, HookMatcher, SessionStore, sandbox, plugins, and the agents dataclass are all deferred. --- src/client.rs | 220 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 132 ++++++++++++++++++++++++++++++ src/query.rs | 140 ++++++++++++++++++++++++++++++++ 3 files changed, 492 insertions(+) create mode 100644 src/client.rs create mode 100644 src/lib.rs create mode 100644 src/query.rs diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..f638d95 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,220 @@ +//! [`Client`] — bidirectional, multi-turn session with Claude Code. +//! +//! Mirrors the Python SDK's `ClaudeSDKClient`. Use [`Client::connect`] to +//! spawn the CLI, [`Client::send`] to push user messages, and +//! [`Client::messages`] to receive a stream of [`Message`]s. +//! +//! ```no_run +//! use claude_agent_sdk::{Client, ClaudeAgentOptions, Message}; +//! use tokio_stream::StreamExt; +//! +//! # async fn run() -> claude_agent_sdk::Result<()> { +//! let mut client = Client::new(ClaudeAgentOptions::new()).await?; +//! client.connect().await?; +//! client.send("Hello!").await?; +//! let mut stream = client.messages(); +//! while let Some(msg) = stream.next().await { +//! let msg = msg?; +//! if msg.is_result() { +//! break; +//! } +//! } +//! client.disconnect().await?; +//! # Ok(()) } +//! ``` +//! +//! # v0.1 limitations +//! +//! - No control protocol — `interrupt()`, `set_permission_mode()`, +//! `set_model()`, and other live-mutation methods are deferred to v0.2. +//! - The Python SDK sends an `initialize` control request before the first +//! user message; this SDK skips that step because v0.1 doesn't surface +//! hooks or agents-via-initialize. Newer CLI versions still operate +//! correctly without it. + +use futures_core::Stream; +use serde_json::json; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use crate::errors::{Error, Result}; +use crate::messages::Message; +use crate::options::ClaudeAgentOptions; +use crate::transport::{SubprocessTransport, TransportHandle, TransportWriter}; + +/// Buffer depth on the inbound message channel — picked to match typical CLI +/// burst size for one turn. +const MESSAGE_CHANNEL_BUFFER: usize = 64; + +/// A bidirectional client for the Claude CLI. +/// +/// Lifecycle: [`new`](Self::new) → [`connect`](Self::connect) → one or more +/// [`send`](Self::send) calls interleaved with consumption of +/// [`messages`](Self::messages) → [`disconnect`](Self::disconnect). +/// +/// `Drop` does NOT kill the subprocess — call `disconnect()` for a clean +/// shutdown. (Killing on drop would leak a tokio task; the `TransportHandle` +/// drop still aborts the child process if the runtime allows it.) +pub struct Client { + options: ClaudeAgentOptions, + writer: Option, + handle: Option, + reader_task: Option>, + rx: Option>>, + session_id: String, + disconnected: bool, +} + +impl Client { + /// Build a client from options. Resolves the CLI binary path eagerly. + pub async fn new(options: ClaudeAgentOptions) -> Result { + // Eager path resolution: catches CliNotFound before spawn. + let _ = SubprocessTransport::new(options.clone())?; + Ok(Self { + options, + writer: None, + handle: None, + reader_task: None, + rx: None, + session_id: "default".into(), + disconnected: false, + }) + } + + /// Override the session ID stamped on outgoing user messages. Defaults + /// to `"default"`. + pub fn with_session_id(mut self, id: impl Into) -> Self { + self.session_id = id.into(); + self + } + + /// Spawn the CLI subprocess and start the reader task. Idempotent — a + /// second call after a successful connect is a no-op. + pub async fn connect(&mut self) -> Result<()> { + if self.writer.is_some() { + return Ok(()); + } + let transport = SubprocessTransport::new(self.options.clone())?; + let (mut reader, writer, handle) = transport.connect().await?; + + let (tx, rx) = mpsc::channel::>(MESSAGE_CHANNEL_BUFFER); + let reader_task = tokio::spawn(async move { + loop { + match reader.read_frame().await { + Ok(Some(value)) => { + let parsed: Result = serde_json::from_value(value.clone()) + .map_err(|e| { + Error::parse_with_data( + format!("could not parse CLI frame: {e}"), + value, + ) + }); + if tx.send(parsed).await.is_err() { + break; + } + } + Ok(None) => break, + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + self.writer = Some(writer); + self.handle = Some(handle); + self.reader_task = Some(reader_task); + self.rx = Some(rx); + Ok(()) + } + + /// Send a plain user message. Each call appends one JSON frame: + /// + /// ```json + /// {"type": "user", "message": {"role": "user", "content": ""}, + /// "parent_tool_use_id": null, "session_id": ""} + /// ``` + /// + /// The client must be connected — call [`connect`](Self::connect) first. + pub async fn send(&self, prompt: impl Into) -> Result<()> { + let writer = self + .writer + .as_ref() + .ok_or_else(|| Error::conn("not connected; call connect() first"))?; + let frame = json!({ + "type": "user", + "message": {"role": "user", "content": prompt.into()}, + "parent_tool_use_id": serde_json::Value::Null, + "session_id": self.session_id, + }); + writer.write_frame(&frame.to_string()).await + } + + /// Send a raw user-message JSON frame. Use this when you need to push + /// structured content blocks (tool results, etc.). The caller is + /// responsible for shape — the frame is forwarded verbatim modulo a + /// trailing newline and a stamped `session_id` if absent. + pub async fn send_raw(&self, mut frame: serde_json::Value) -> Result<()> { + let writer = self + .writer + .as_ref() + .ok_or_else(|| Error::conn("not connected; call connect() first"))?; + if let serde_json::Value::Object(map) = &mut frame { + map.entry("session_id".to_string()) + .or_insert(serde_json::Value::String(self.session_id.clone())); + } + writer.write_frame(&frame.to_string()).await + } + + /// Take the receiver as a [`Stream`]. May only be called once per + /// `connect()` — subsequent calls return an empty stream. + pub fn messages(&mut self) -> impl Stream> + Send + 'static + use<> { + match self.rx.take() { + Some(rx) => ReceiverStream::new(rx), + None => { + let (_, rx) = mpsc::channel(1); + ReceiverStream::new(rx) + } + } + } + + /// Close stdin and wait for the subprocess to exit. Idempotent. + /// + /// Returns the subprocess exit error as [`Error::Process`] when the CLI + /// exited non-zero (matches the Python SDK). + pub async fn disconnect(&mut self) -> Result<()> { + if self.disconnected { + return Ok(()); + } + self.disconnected = true; + // Drop the receiver first so the reader sees a closed channel and + // exits when stdout EOFs. + drop(self.rx.take()); + // Close stdin via the writer to let the CLI shut down cleanly. + if let Some(writer) = self.writer.take() { + writer.end_input().await; + } + // Now await the reader task — it exits on stdout EOF (since stdin is + // closed, the CLI will reach end-of-input and exit, which closes + // stdout from its end). + if let Some(task) = self.reader_task.take() { + let _ = task.await; + } + // Finally, reap the subprocess. + if let Some(handle) = self.handle.take() { + handle.close().await?; + } + Ok(()) + } +} + +impl Drop for Client { + fn drop(&mut self) { + // The TransportHandle's Drop will start_kill() the child if it's + // still alive; the reader task will then exit on its own. We don't + // explicitly tear down here because async cleanup on Drop is not + // ergonomic in Rust — callers should use disconnect() for clean + // shutdown. + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c90f8b6 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,132 @@ +//! Async Rust SDK for the Claude Agent CLI. +//! +//! This crate is a Rust port of the official Python +//! [`claude-agent-sdk`](https://github.com/anthropics/claude-agent-sdk-python). +//! It wraps the `claude` CLI as a subprocess and exposes its +//! newline-delimited JSON stream as ergonomic Rust types. +//! +//! # Quick start +//! +//! ```no_run +//! use claude_agent_sdk::{query, ClaudeAgentOptions, Message, ContentBlock}; +//! use tokio_stream::StreamExt; +//! +//! #[tokio::main] +//! async fn main() -> claude_agent_sdk::Result<()> { +//! let opts = ClaudeAgentOptions::new() +//! .with_system_prompt("You are a helpful assistant.") +//! .with_max_turns(1); +//! +//! let mut stream = query("What is 2 + 2?", opts).await?; +//! +//! while let Some(msg) = stream.next().await { +//! match msg? { +//! Message::Assistant(a) => { +//! for block in a.message.content { +//! if let ContentBlock::Text(t) = block { +//! println!("Claude: {}", t.text); +//! } +//! } +//! } +//! Message::Result(r) => { +//! if let Some(usd) = r.total_cost_usd { +//! println!("Cost: ${:.4}", usd); +//! } +//! break; +//! } +//! _ => {} +//! } +//! } +//! Ok(()) +//! } +//! ``` +//! +//! # Two entry points +//! +//! - [`query`] — one-shot prompts. Returns a stream that ends when the CLI +//! emits its terminal `result` message, automatically tearing down the +//! subprocess. +//! - [`Client`] — bidirectional, multi-turn sessions. Lets you send +//! follow-up prompts in response to assistant messages, mirroring the +//! Python SDK's `ClaudeSDKClient`. +//! +//! # Configuration +//! +//! Configure both entry points via [`ClaudeAgentOptions`]. It uses a builder +//! pattern — chain `.with_*()` methods on a default value: +//! +//! ``` +//! use claude_agent_sdk::{ClaudeAgentOptions, PermissionMode}; +//! +//! let opts = ClaudeAgentOptions::new() +//! .with_model("claude-sonnet-4-5") +//! .with_permission_mode(PermissionMode::AcceptEdits) +//! .with_allowed_tool("Read") +//! .with_allowed_tool("Bash") +//! .with_cwd("/tmp/my-project") +//! .with_max_turns(5); +//! ``` +//! +//! See [`ClaudeAgentOptions`] for the full list of supported fields. +//! +//! # Field naming +//! +//! The CLI wire protocol is `snake_case`, so deserialized field names match +//! the Python SDK directly (`session_id`, `total_cost_usd`, `tool_use_id`, +//! etc.). Where the wire used `camelCase` historically — `modelUsage` — we +//! preserve it via `#[serde(rename = "...")]` rather than blanket-renaming +//! the container. +//! +//! # v0.1 scope and known limitations +//! +//! The v0.1 port covers the core path of the Python SDK: +//! +//! - Subprocess transport with newline-delimited JSON framing. +//! - All message and content-block types parsed faithfully. +//! - `query()` + `Client` for one-shot and bidirectional use. +//! - The full `ClaudeAgentOptions` flag surface relevant to subprocess args. +//! +//! Deferred to v0.2 — see the upstream README for the current shape of these +//! features in Python: +//! +//! - **Control protocol over JSON-RPC**: `interrupt()`, `set_permission_mode()`, +//! `set_model()`, `get_mcp_status()`, etc. The Rust [`Client`] today only +//! speaks the bare user / assistant / result frames. +//! - **`can_use_tool` permission callback**: requires the control protocol. +//! - **`@tool` decorator / `create_sdk_mcp_server()`**: in-process MCP +//! servers — needs a derive macro or trait shape, deferred until +//! downstream demand is clearer. +//! - **`HookMatcher`** and hook callbacks: the wire format is supported +//! (initialize-payload + hook_callback responses) but the Rust callback +//! surface is not designed yet. +//! - **`SessionStore`** mirroring adapter trait. +//! - **`Sandbox` settings**, **plugins**, **agents** dataclass. +//! +//! These all degrade gracefully — the CLI ignores absent stdin frames, so +//! a v0.1 caller using a `claude` build that supports control requests will +//! simply see fewer features than the Python SDK exposes. + +#![deny(missing_docs)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +mod client; +mod errors; +mod messages; +mod options; +mod query; +mod transport; + +pub use client::Client; +pub use errors::{Error, Result}; +pub use messages::{ + AssistantMessage, AssistantMessageInner, ContentBlock, Message, ResultMessage, + ServerToolResultBlock, ServerToolUseBlock, StreamEventMessage, SystemMessage, TextBlock, + ThinkingBlock, ToolResultBlock, ToolUseBlock, UserContent, UserMessage, UserMessageInner, +}; +pub use options::{ClaudeAgentOptions, Effort, McpServersConfig, PermissionMode, SystemPrompt}; +pub use query::query; +pub use transport::SubprocessTransport; + +/// Crate version, as set in `Cargo.toml`. Sent to the CLI as +/// `CLAUDE_AGENT_SDK_VERSION` in the subprocess env. +pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/src/query.rs b/src/query.rs new file mode 100644 index 0000000..bf09f7c --- /dev/null +++ b/src/query.rs @@ -0,0 +1,140 @@ +//! [`query`] — one-shot fire-and-forget interaction with Claude Code. +//! +//! Mirror of the Python SDK's `query()` function. Returns a [`Stream`] of +//! messages and shuts down the underlying subprocess once the stream is +//! exhausted or dropped. +//! +//! ```no_run +//! use claude_agent_sdk::{query, ClaudeAgentOptions, Message}; +//! use tokio_stream::StreamExt; +//! +//! # async fn run() -> claude_agent_sdk::Result<()> { +//! let mut stream = query("What is 2 + 2?", ClaudeAgentOptions::new()).await?; +//! while let Some(msg) = stream.next().await { +//! match msg? { +//! Message::Assistant(a) => println!("{:?}", a.message.content), +//! Message::Result(_) => break, +//! _ => {} +//! } +//! } +//! # Ok(()) } +//! ``` + +use futures_core::Stream; + +use crate::client::Client; +use crate::errors::Result; +use crate::messages::Message; +use crate::options::ClaudeAgentOptions; + +/// Send a single prompt and return a stream of messages until the terminal +/// `result` message arrives, then shut down the subprocess. +/// +/// The returned stream automatically calls [`Client::disconnect`] when the +/// terminal [`Message::Result`] is observed (or when the stream is dropped), +/// so callers don't need to manage the subprocess lifecycle by hand. +/// +/// # Errors +/// +/// - [`crate::Error::CliNotFound`] when the `claude` binary can't be found. +/// - [`crate::Error::CliConnection`] when the subprocess fails to spawn. +/// - Per-message [`crate::Error::MessageParse`] or +/// [`crate::Error::JsonDecode`] when the CLI emits an unparseable frame +/// mid-stream. These are emitted as `Err` items in the stream, not failures +/// at `query()` call time. +/// +/// [`Client::disconnect`]: crate::Client::disconnect +pub async fn query( + prompt: impl Into, + options: ClaudeAgentOptions, +) -> Result> + Send + 'static> { + let prompt = prompt.into(); + let mut client = Client::new(options).await?; + client.connect().await?; + client.send(&prompt).await?; + let inner = client.messages(); + Ok(StopAfterResult::new(inner, client)) +} + +/// Stream adapter that owns a [`Client`] and stops yielding after the first +/// [`Message::Result`], draining and disconnecting the client as part of +/// shutdown. +struct StopAfterResult { + inner: S, + client: Option, + finished: bool, +} + +impl StopAfterResult +where + S: Stream> + Unpin + Send, +{ + fn new(inner: S, client: Client) -> Self { + Self { + inner, + client: Some(client), + finished: false, + } + } +} + +impl Stream for StopAfterResult +where + S: Stream> + Unpin + Send, +{ + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.finished { + return std::task::Poll::Ready(None); + } + match std::pin::Pin::new(&mut self.inner).poll_next(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(None) => { + self.finished = true; + self.spawn_disconnect(); + std::task::Poll::Ready(None) + } + std::task::Poll::Ready(Some(item)) => { + let stop = matches!(&item, Ok(m) if m.is_result()); + if stop { + self.finished = true; + self.spawn_disconnect(); + } + std::task::Poll::Ready(Some(item)) + } + } + } +} + +impl StopAfterResult { + fn spawn_disconnect(&mut self) { + if let Some(mut client) = self.client.take() { + tokio::spawn(async move { + if let Err(e) = client.disconnect().await { + tracing::debug!("query subprocess disconnect error: {e}"); + } + }); + } + } +} + +impl Drop for StopAfterResult { + fn drop(&mut self) { + // If the consumer dropped the stream early (before a Result message), + // tear down the subprocess too. spawn() into the active runtime; if + // there is no runtime, the underlying SubprocessTransport::Drop + // still kicks the child. + if let Some(mut client) = self.client.take() { + if tokio::runtime::Handle::try_current().is_ok() { + tokio::spawn(async move { + let _ = client.disconnect().await; + }); + } + } + } +} +