port query() and Client
Two entry points mirroring the Python SDK's surface: - query(prompt, options) returns an impl Stream<Item = Result<Message>> that terminates after the CLI emits its terminal result message. The stream owns the underlying Client and tears down the subprocess via a spawned disconnect task either on Result observation or on Drop. - Client (mirror of ClaudeSDKClient) supports bidirectional multi-turn sessions: connect, send (or send_raw for tool-result frames), drain the messages stream, repeat. Drop is intentionally a no-op for the subprocess — callers should call disconnect() for a clean shutdown that surfaces non-zero exit codes as Error::Process. lib.rs re-exports the public API and carries the crate-level docs + quick-start example. The v0.1 / v0.2 split is documented inline: control protocol (interrupt, set_permission_mode, etc.), can_use_tool, in-process MCP servers, HookMatcher, SessionStore, sandbox, plugins, and the agents dataclass are all deferred.
This commit is contained in:
parent
ba189340e5
commit
ef1d8fdd31
3 changed files with 492 additions and 0 deletions
220
src/client.rs
Normal file
220
src/client.rs
Normal file
|
|
@ -0,0 +1,220 @@
|
||||||
|
//! [`Client`] — bidirectional, multi-turn session with Claude Code.
|
||||||
|
//!
|
||||||
|
//! Mirrors the Python SDK's `ClaudeSDKClient`. Use [`Client::connect`] to
|
||||||
|
//! spawn the CLI, [`Client::send`] to push user messages, and
|
||||||
|
//! [`Client::messages`] to receive a stream of [`Message`]s.
|
||||||
|
//!
|
||||||
|
//! ```no_run
|
||||||
|
//! use claude_agent_sdk::{Client, ClaudeAgentOptions, Message};
|
||||||
|
//! use tokio_stream::StreamExt;
|
||||||
|
//!
|
||||||
|
//! # async fn run() -> claude_agent_sdk::Result<()> {
|
||||||
|
//! let mut client = Client::new(ClaudeAgentOptions::new()).await?;
|
||||||
|
//! client.connect().await?;
|
||||||
|
//! client.send("Hello!").await?;
|
||||||
|
//! let mut stream = client.messages();
|
||||||
|
//! while let Some(msg) = stream.next().await {
|
||||||
|
//! let msg = msg?;
|
||||||
|
//! if msg.is_result() {
|
||||||
|
//! break;
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! client.disconnect().await?;
|
||||||
|
//! # Ok(()) }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! # v0.1 limitations
|
||||||
|
//!
|
||||||
|
//! - No control protocol — `interrupt()`, `set_permission_mode()`,
|
||||||
|
//! `set_model()`, and other live-mutation methods are deferred to v0.2.
|
||||||
|
//! - The Python SDK sends an `initialize` control request before the first
|
||||||
|
//! user message; this SDK skips that step because v0.1 doesn't surface
|
||||||
|
//! hooks or agents-via-initialize. Newer CLI versions still operate
|
||||||
|
//! correctly without it.
|
||||||
|
|
||||||
|
use futures_core::Stream;
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
|
||||||
|
use crate::errors::{Error, Result};
|
||||||
|
use crate::messages::Message;
|
||||||
|
use crate::options::ClaudeAgentOptions;
|
||||||
|
use crate::transport::{SubprocessTransport, TransportHandle, TransportWriter};
|
||||||
|
|
||||||
|
/// Buffer depth on the inbound message channel — picked to match typical CLI
|
||||||
|
/// burst size for one turn.
|
||||||
|
const MESSAGE_CHANNEL_BUFFER: usize = 64;
|
||||||
|
|
||||||
|
/// A bidirectional client for the Claude CLI.
|
||||||
|
///
|
||||||
|
/// Lifecycle: [`new`](Self::new) → [`connect`](Self::connect) → one or more
|
||||||
|
/// [`send`](Self::send) calls interleaved with consumption of
|
||||||
|
/// [`messages`](Self::messages) → [`disconnect`](Self::disconnect).
|
||||||
|
///
|
||||||
|
/// `Drop` does NOT kill the subprocess — call `disconnect()` for a clean
|
||||||
|
/// shutdown. (Killing on drop would leak a tokio task; the `TransportHandle`
|
||||||
|
/// drop still aborts the child process if the runtime allows it.)
|
||||||
|
pub struct Client {
|
||||||
|
options: ClaudeAgentOptions,
|
||||||
|
writer: Option<TransportWriter>,
|
||||||
|
handle: Option<TransportHandle>,
|
||||||
|
reader_task: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
rx: Option<mpsc::Receiver<Result<Message>>>,
|
||||||
|
session_id: String,
|
||||||
|
disconnected: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
/// Build a client from options. Resolves the CLI binary path eagerly.
|
||||||
|
pub async fn new(options: ClaudeAgentOptions) -> Result<Self> {
|
||||||
|
// Eager path resolution: catches CliNotFound before spawn.
|
||||||
|
let _ = SubprocessTransport::new(options.clone())?;
|
||||||
|
Ok(Self {
|
||||||
|
options,
|
||||||
|
writer: None,
|
||||||
|
handle: None,
|
||||||
|
reader_task: None,
|
||||||
|
rx: None,
|
||||||
|
session_id: "default".into(),
|
||||||
|
disconnected: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Override the session ID stamped on outgoing user messages. Defaults
|
||||||
|
/// to `"default"`.
|
||||||
|
pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
|
||||||
|
self.session_id = id.into();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn the CLI subprocess and start the reader task. Idempotent — a
|
||||||
|
/// second call after a successful connect is a no-op.
|
||||||
|
pub async fn connect(&mut self) -> Result<()> {
|
||||||
|
if self.writer.is_some() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let transport = SubprocessTransport::new(self.options.clone())?;
|
||||||
|
let (mut reader, writer, handle) = transport.connect().await?;
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::channel::<Result<Message>>(MESSAGE_CHANNEL_BUFFER);
|
||||||
|
let reader_task = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match reader.read_frame().await {
|
||||||
|
Ok(Some(value)) => {
|
||||||
|
let parsed: Result<Message> = serde_json::from_value(value.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
Error::parse_with_data(
|
||||||
|
format!("could not parse CLI frame: {e}"),
|
||||||
|
value,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
if tx.send(parsed).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => break,
|
||||||
|
Err(e) => {
|
||||||
|
let _ = tx.send(Err(e)).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
self.writer = Some(writer);
|
||||||
|
self.handle = Some(handle);
|
||||||
|
self.reader_task = Some(reader_task);
|
||||||
|
self.rx = Some(rx);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a plain user message. Each call appends one JSON frame:
|
||||||
|
///
|
||||||
|
/// ```json
|
||||||
|
/// {"type": "user", "message": {"role": "user", "content": "<prompt>"},
|
||||||
|
/// "parent_tool_use_id": null, "session_id": "<id>"}
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// The client must be connected — call [`connect`](Self::connect) first.
|
||||||
|
pub async fn send(&self, prompt: impl Into<String>) -> Result<()> {
|
||||||
|
let writer = self
|
||||||
|
.writer
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| Error::conn("not connected; call connect() first"))?;
|
||||||
|
let frame = json!({
|
||||||
|
"type": "user",
|
||||||
|
"message": {"role": "user", "content": prompt.into()},
|
||||||
|
"parent_tool_use_id": serde_json::Value::Null,
|
||||||
|
"session_id": self.session_id,
|
||||||
|
});
|
||||||
|
writer.write_frame(&frame.to_string()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a raw user-message JSON frame. Use this when you need to push
|
||||||
|
/// structured content blocks (tool results, etc.). The caller is
|
||||||
|
/// responsible for shape — the frame is forwarded verbatim modulo a
|
||||||
|
/// trailing newline and a stamped `session_id` if absent.
|
||||||
|
pub async fn send_raw(&self, mut frame: serde_json::Value) -> Result<()> {
|
||||||
|
let writer = self
|
||||||
|
.writer
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| Error::conn("not connected; call connect() first"))?;
|
||||||
|
if let serde_json::Value::Object(map) = &mut frame {
|
||||||
|
map.entry("session_id".to_string())
|
||||||
|
.or_insert(serde_json::Value::String(self.session_id.clone()));
|
||||||
|
}
|
||||||
|
writer.write_frame(&frame.to_string()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Take the receiver as a [`Stream`]. May only be called once per
|
||||||
|
/// `connect()` — subsequent calls return an empty stream.
|
||||||
|
pub fn messages(&mut self) -> impl Stream<Item = Result<Message>> + Send + 'static + use<> {
|
||||||
|
match self.rx.take() {
|
||||||
|
Some(rx) => ReceiverStream::new(rx),
|
||||||
|
None => {
|
||||||
|
let (_, rx) = mpsc::channel(1);
|
||||||
|
ReceiverStream::new(rx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Close stdin and wait for the subprocess to exit. Idempotent.
|
||||||
|
///
|
||||||
|
/// Returns the subprocess exit error as [`Error::Process`] when the CLI
|
||||||
|
/// exited non-zero (matches the Python SDK).
|
||||||
|
pub async fn disconnect(&mut self) -> Result<()> {
|
||||||
|
if self.disconnected {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.disconnected = true;
|
||||||
|
// Drop the receiver first so the reader sees a closed channel and
|
||||||
|
// exits when stdout EOFs.
|
||||||
|
drop(self.rx.take());
|
||||||
|
// Close stdin via the writer to let the CLI shut down cleanly.
|
||||||
|
if let Some(writer) = self.writer.take() {
|
||||||
|
writer.end_input().await;
|
||||||
|
}
|
||||||
|
// Now await the reader task — it exits on stdout EOF (since stdin is
|
||||||
|
// closed, the CLI will reach end-of-input and exit, which closes
|
||||||
|
// stdout from its end).
|
||||||
|
if let Some(task) = self.reader_task.take() {
|
||||||
|
let _ = task.await;
|
||||||
|
}
|
||||||
|
// Finally, reap the subprocess.
|
||||||
|
if let Some(handle) = self.handle.take() {
|
||||||
|
handle.close().await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Client {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// The TransportHandle's Drop will start_kill() the child if it's
|
||||||
|
// still alive; the reader task will then exit on its own. We don't
|
||||||
|
// explicitly tear down here because async cleanup on Drop is not
|
||||||
|
// ergonomic in Rust — callers should use disconnect() for clean
|
||||||
|
// shutdown.
|
||||||
|
}
|
||||||
|
}
|
||||||
132
src/lib.rs
Normal file
132
src/lib.rs
Normal file
|
|
@ -0,0 +1,132 @@
|
||||||
|
//! Async Rust SDK for the Claude Agent CLI.
|
||||||
|
//!
|
||||||
|
//! This crate is a Rust port of the official Python
|
||||||
|
//! [`claude-agent-sdk`](https://github.com/anthropics/claude-agent-sdk-python).
|
||||||
|
//! It wraps the `claude` CLI as a subprocess and exposes its
|
||||||
|
//! newline-delimited JSON stream as ergonomic Rust types.
|
||||||
|
//!
|
||||||
|
//! # Quick start
|
||||||
|
//!
|
||||||
|
//! ```no_run
|
||||||
|
//! use claude_agent_sdk::{query, ClaudeAgentOptions, Message, ContentBlock};
|
||||||
|
//! use tokio_stream::StreamExt;
|
||||||
|
//!
|
||||||
|
//! #[tokio::main]
|
||||||
|
//! async fn main() -> claude_agent_sdk::Result<()> {
|
||||||
|
//! let opts = ClaudeAgentOptions::new()
|
||||||
|
//! .with_system_prompt("You are a helpful assistant.")
|
||||||
|
//! .with_max_turns(1);
|
||||||
|
//!
|
||||||
|
//! let mut stream = query("What is 2 + 2?", opts).await?;
|
||||||
|
//!
|
||||||
|
//! while let Some(msg) = stream.next().await {
|
||||||
|
//! match msg? {
|
||||||
|
//! Message::Assistant(a) => {
|
||||||
|
//! for block in a.message.content {
|
||||||
|
//! if let ContentBlock::Text(t) = block {
|
||||||
|
//! println!("Claude: {}", t.text);
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! Message::Result(r) => {
|
||||||
|
//! if let Some(usd) = r.total_cost_usd {
|
||||||
|
//! println!("Cost: ${:.4}", usd);
|
||||||
|
//! }
|
||||||
|
//! break;
|
||||||
|
//! }
|
||||||
|
//! _ => {}
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! Ok(())
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! # Two entry points
|
||||||
|
//!
|
||||||
|
//! - [`query`] — one-shot prompts. Returns a stream that ends when the CLI
|
||||||
|
//! emits its terminal `result` message, automatically tearing down the
|
||||||
|
//! subprocess.
|
||||||
|
//! - [`Client`] — bidirectional, multi-turn sessions. Lets you send
|
||||||
|
//! follow-up prompts in response to assistant messages, mirroring the
|
||||||
|
//! Python SDK's `ClaudeSDKClient`.
|
||||||
|
//!
|
||||||
|
//! # Configuration
|
||||||
|
//!
|
||||||
|
//! Configure both entry points via [`ClaudeAgentOptions`]. It uses a builder
|
||||||
|
//! pattern — chain `.with_*()` methods on a default value:
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
|
//! use claude_agent_sdk::{ClaudeAgentOptions, PermissionMode};
|
||||||
|
//!
|
||||||
|
//! let opts = ClaudeAgentOptions::new()
|
||||||
|
//! .with_model("claude-sonnet-4-5")
|
||||||
|
//! .with_permission_mode(PermissionMode::AcceptEdits)
|
||||||
|
//! .with_allowed_tool("Read")
|
||||||
|
//! .with_allowed_tool("Bash")
|
||||||
|
//! .with_cwd("/tmp/my-project")
|
||||||
|
//! .with_max_turns(5);
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! See [`ClaudeAgentOptions`] for the full list of supported fields.
|
||||||
|
//!
|
||||||
|
//! # Field naming
|
||||||
|
//!
|
||||||
|
//! The CLI wire protocol is `snake_case`, so deserialized field names match
|
||||||
|
//! the Python SDK directly (`session_id`, `total_cost_usd`, `tool_use_id`,
|
||||||
|
//! etc.). Where the wire used `camelCase` historically — `modelUsage` — we
|
||||||
|
//! preserve it via `#[serde(rename = "...")]` rather than blanket-renaming
|
||||||
|
//! the container.
|
||||||
|
//!
|
||||||
|
//! # v0.1 scope and known limitations
|
||||||
|
//!
|
||||||
|
//! The v0.1 port covers the core path of the Python SDK:
|
||||||
|
//!
|
||||||
|
//! - Subprocess transport with newline-delimited JSON framing.
|
||||||
|
//! - All message and content-block types parsed faithfully.
|
||||||
|
//! - `query()` + `Client` for one-shot and bidirectional use.
|
||||||
|
//! - The full `ClaudeAgentOptions` flag surface relevant to subprocess args.
|
||||||
|
//!
|
||||||
|
//! Deferred to v0.2 — see the upstream README for the current shape of these
|
||||||
|
//! features in Python:
|
||||||
|
//!
|
||||||
|
//! - **Control protocol over JSON-RPC**: `interrupt()`, `set_permission_mode()`,
|
||||||
|
//! `set_model()`, `get_mcp_status()`, etc. The Rust [`Client`] today only
|
||||||
|
//! speaks the bare user / assistant / result frames.
|
||||||
|
//! - **`can_use_tool` permission callback**: requires the control protocol.
|
||||||
|
//! - **`@tool` decorator / `create_sdk_mcp_server()`**: in-process MCP
|
||||||
|
//! servers — needs a derive macro or trait shape, deferred until
|
||||||
|
//! downstream demand is clearer.
|
||||||
|
//! - **`HookMatcher`** and hook callbacks: the wire format is supported
|
||||||
|
//! (initialize-payload + hook_callback responses) but the Rust callback
|
||||||
|
//! surface is not designed yet.
|
||||||
|
//! - **`SessionStore`** mirroring adapter trait.
|
||||||
|
//! - **`Sandbox` settings**, **plugins**, **agents** dataclass.
|
||||||
|
//!
|
||||||
|
//! These all degrade gracefully — the CLI ignores absent stdin frames, so
|
||||||
|
//! a v0.1 caller using a `claude` build that supports control requests will
|
||||||
|
//! simply see fewer features than the Python SDK exposes.
|
||||||
|
|
||||||
|
#![deny(missing_docs)]
|
||||||
|
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||||
|
|
||||||
|
mod client;
|
||||||
|
mod errors;
|
||||||
|
mod messages;
|
||||||
|
mod options;
|
||||||
|
mod query;
|
||||||
|
mod transport;
|
||||||
|
|
||||||
|
pub use client::Client;
|
||||||
|
pub use errors::{Error, Result};
|
||||||
|
pub use messages::{
|
||||||
|
AssistantMessage, AssistantMessageInner, ContentBlock, Message, ResultMessage,
|
||||||
|
ServerToolResultBlock, ServerToolUseBlock, StreamEventMessage, SystemMessage, TextBlock,
|
||||||
|
ThinkingBlock, ToolResultBlock, ToolUseBlock, UserContent, UserMessage, UserMessageInner,
|
||||||
|
};
|
||||||
|
pub use options::{ClaudeAgentOptions, Effort, McpServersConfig, PermissionMode, SystemPrompt};
|
||||||
|
pub use query::query;
|
||||||
|
pub use transport::SubprocessTransport;
|
||||||
|
|
||||||
|
/// Crate version, as set in `Cargo.toml`. Sent to the CLI as
|
||||||
|
/// `CLAUDE_AGENT_SDK_VERSION` in the subprocess env.
|
||||||
|
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
140
src/query.rs
Normal file
140
src/query.rs
Normal file
|
|
@ -0,0 +1,140 @@
|
||||||
|
//! [`query`] — one-shot fire-and-forget interaction with Claude Code.
|
||||||
|
//!
|
||||||
|
//! Mirror of the Python SDK's `query()` function. Returns a [`Stream`] of
|
||||||
|
//! messages and shuts down the underlying subprocess once the stream is
|
||||||
|
//! exhausted or dropped.
|
||||||
|
//!
|
||||||
|
//! ```no_run
|
||||||
|
//! use claude_agent_sdk::{query, ClaudeAgentOptions, Message};
|
||||||
|
//! use tokio_stream::StreamExt;
|
||||||
|
//!
|
||||||
|
//! # async fn run() -> claude_agent_sdk::Result<()> {
|
||||||
|
//! let mut stream = query("What is 2 + 2?", ClaudeAgentOptions::new()).await?;
|
||||||
|
//! while let Some(msg) = stream.next().await {
|
||||||
|
//! match msg? {
|
||||||
|
//! Message::Assistant(a) => println!("{:?}", a.message.content),
|
||||||
|
//! Message::Result(_) => break,
|
||||||
|
//! _ => {}
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! # Ok(()) }
|
||||||
|
//! ```
|
||||||
|
|
||||||
|
use futures_core::Stream;
|
||||||
|
|
||||||
|
use crate::client::Client;
|
||||||
|
use crate::errors::Result;
|
||||||
|
use crate::messages::Message;
|
||||||
|
use crate::options::ClaudeAgentOptions;
|
||||||
|
|
||||||
|
/// Send a single prompt and return a stream of messages until the terminal
|
||||||
|
/// `result` message arrives, then shut down the subprocess.
|
||||||
|
///
|
||||||
|
/// The returned stream automatically calls [`Client::disconnect`] when the
|
||||||
|
/// terminal [`Message::Result`] is observed (or when the stream is dropped),
|
||||||
|
/// so callers don't need to manage the subprocess lifecycle by hand.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// - [`crate::Error::CliNotFound`] when the `claude` binary can't be found.
|
||||||
|
/// - [`crate::Error::CliConnection`] when the subprocess fails to spawn.
|
||||||
|
/// - Per-message [`crate::Error::MessageParse`] or
|
||||||
|
/// [`crate::Error::JsonDecode`] when the CLI emits an unparseable frame
|
||||||
|
/// mid-stream. These are emitted as `Err` items in the stream, not failures
|
||||||
|
/// at `query()` call time.
|
||||||
|
///
|
||||||
|
/// [`Client::disconnect`]: crate::Client::disconnect
|
||||||
|
pub async fn query(
|
||||||
|
prompt: impl Into<String>,
|
||||||
|
options: ClaudeAgentOptions,
|
||||||
|
) -> Result<impl Stream<Item = Result<Message>> + Send + 'static> {
|
||||||
|
let prompt = prompt.into();
|
||||||
|
let mut client = Client::new(options).await?;
|
||||||
|
client.connect().await?;
|
||||||
|
client.send(&prompt).await?;
|
||||||
|
let inner = client.messages();
|
||||||
|
Ok(StopAfterResult::new(inner, client))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stream adapter that owns a [`Client`] and stops yielding after the first
|
||||||
|
/// [`Message::Result`], draining and disconnecting the client as part of
|
||||||
|
/// shutdown.
|
||||||
|
struct StopAfterResult<S> {
|
||||||
|
inner: S,
|
||||||
|
client: Option<Client>,
|
||||||
|
finished: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> StopAfterResult<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Message>> + Unpin + Send,
|
||||||
|
{
|
||||||
|
fn new(inner: S, client: Client) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
client: Some(client),
|
||||||
|
finished: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Stream for StopAfterResult<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Message>> + Unpin + Send,
|
||||||
|
{
|
||||||
|
type Item = Result<Message>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
mut self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Option<Self::Item>> {
|
||||||
|
if self.finished {
|
||||||
|
return std::task::Poll::Ready(None);
|
||||||
|
}
|
||||||
|
match std::pin::Pin::new(&mut self.inner).poll_next(cx) {
|
||||||
|
std::task::Poll::Pending => std::task::Poll::Pending,
|
||||||
|
std::task::Poll::Ready(None) => {
|
||||||
|
self.finished = true;
|
||||||
|
self.spawn_disconnect();
|
||||||
|
std::task::Poll::Ready(None)
|
||||||
|
}
|
||||||
|
std::task::Poll::Ready(Some(item)) => {
|
||||||
|
let stop = matches!(&item, Ok(m) if m.is_result());
|
||||||
|
if stop {
|
||||||
|
self.finished = true;
|
||||||
|
self.spawn_disconnect();
|
||||||
|
}
|
||||||
|
std::task::Poll::Ready(Some(item))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> StopAfterResult<S> {
|
||||||
|
fn spawn_disconnect(&mut self) {
|
||||||
|
if let Some(mut client) = self.client.take() {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = client.disconnect().await {
|
||||||
|
tracing::debug!("query subprocess disconnect error: {e}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Drop for StopAfterResult<S> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// If the consumer dropped the stream early (before a Result message),
|
||||||
|
// tear down the subprocess too. spawn() into the active runtime; if
|
||||||
|
// there is no runtime, the underlying SubprocessTransport::Drop
|
||||||
|
// still kicks the child.
|
||||||
|
if let Some(mut client) = self.client.take() {
|
||||||
|
if tokio::runtime::Handle::try_current().is_ok() {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = client.disconnect().await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue