From 1e7407867f18abdc15b4b404f5b48ee85b4ab8d7 Mon Sep 17 00:00:00 2001 From: Clark Alesna Date: Wed, 3 Jan 2024 19:34:16 +0800 Subject: [PATCH] feat(network): implement split read / write for NamedPipe bearer (#371) --- examples/n2c-miniprotocols/src/main.rs | 40 +++++++++++++++------ pallas-network/src/facades.rs | 48 ++++++++++++++------------ pallas-network/src/multiplexer.rs | 45 ++++++++++++++++++------ 3 files changed, 90 insertions(+), 43 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 9ab73bb..4c6c00b 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,11 +1,11 @@ use pallas::{ - ledger::addresses::Address, + ledger::{addresses::Address, traverse::MultiEraBlock}, network::{ facades::NodeClient, miniprotocols::{ chainsync, localstate::queries_v16::{self, Addr, Addrs}, - Point, PRE_PRODUCTION_MAGIC, + Point, PRE_PRODUCTION_MAGIC, PREVIEW_MAGIC }, }, }; @@ -66,15 +66,15 @@ async fn do_chainsync(client: &mut NodeClient) { info!("intersected point is {:?}", point); - for _ in 0..10 { + loop { let next = client.chainsync().request_next().await.unwrap(); - match next { chainsync::NextResponse::RollForward(h, _) => { - log::info!("rolling forward, block size: {}", h.len()) + let block_number = MultiEraBlock::decode(&h).unwrap().number(); + info!("rolling forward {}, block size: {}", block_number, h.len()) } - chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x), - chainsync::NextResponse::Await => log::info!("tip of chain reached"), + chainsync::NextResponse::RollBackward(x, _) => info!("rollback to {:?}", x), + chainsync::NextResponse::Await => info!("tip of chain reached"), }; } } @@ -82,6 +82,7 @@ async fn do_chainsync(client: &mut NodeClient) { // change the following to match the Cardano node socket in your local // environment const SOCKET_PATH: &str = "/tmp/node.socket"; +const PIPE_NAME: &str = "\\\\.\\pipe\\cardano-pallas"; #[cfg(unix)] #[tokio::main] @@ -106,7 +107,26 @@ async fn main() { do_chainsync(&mut client).await; } -#[cfg(not(target_family = "unix"))] -fn main() { - panic!("can't use n2c unix socket on non-unix systems"); +#[cfg(target_family = "windows")] +#[tokio::main] +async fn main() { + + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + .unwrap(); + + // we connect to the namedpipe of the local node. Make sure you have the right + // path for your environment + let mut client = NodeClient::connect(PIPE_NAME, PREVIEW_MAGIC) + .await + .unwrap(); + + // execute an arbitrary "Local State" query against the node + do_localstate_query(&mut client).await; + + // execute the chainsync flow from an arbitrary point in the chain + do_chainsync(&mut client).await; } diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index d5f87f2..04247ff 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -255,33 +255,37 @@ impl NodeClient { Ok(client) } - // #[cfg(windows)] - // pub async fn connect( - // pipe_name: impl AsRef, - // magic: u64, - // ) -> Result { - // let bearer = tokio::task::spawn_blocking(move || - // Bearer::connect_named_pipe(pipe_name)) .await - // .expect("can't join tokio thread") - // .map_err(Error::ConnectFailure)?; + #[cfg(windows)] + pub async fn connect( + pipe_name: impl AsRef, + magic: u64, + ) -> Result { + let pipe_name = pipe_name.as_ref().to_os_string(); - // let mut client = Self::new(bearer); + let bearer = tokio::task::spawn_blocking(move || { + Bearer::connect_named_pipe(pipe_name) + }) + .await + .expect("can't join tokio thread") + .map_err(Error::ConnectFailure)?; - // let versions = handshake::n2c::VersionTable::v10_and_above(magic); + let mut client = Self::new(bearer); - // let handshake = client - // .handshake() - // .handshake(versions) - // .await - // .map_err(Error::HandshakeProtocol)?; + let versions = handshake::n2c::VersionTable::v10_and_above(magic); - // if let handshake::Confirmation::Rejected(reason) = handshake { - // error!(?reason, "handshake refused"); - // return Err(Error::IncompatibleVersion); - // } + let handshake = client + .handshake() + .handshake(versions) + .await + .map_err(Error::HandshakeProtocol)?; - // Ok(client) - // } + if let handshake::Confirmation::Rejected(reason) = handshake { + error!(?reason, "handshake refused"); + return Err(Error::IncompatibleVersion); + } + + Ok(client) + } #[cfg(unix)] pub async fn handshake_query( diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 746b2f6..1fa34ff 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use byteorder::{ByteOrder, NetworkEndian}; use pallas_codec::{minicbor, Fragment}; use thiserror::Error; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}; use tokio::task::JoinHandle; use tokio::time::Instant; use tokio::{select, sync::mpsc::error::SendError}; @@ -71,8 +71,9 @@ pub enum Bearer { #[cfg(unix)] Unix(unix::UnixStream), - // #[cfg(windows)] - // NamedPipe(NamedPipeClient), + + #[cfg(windows)] + NamedPipe(NamedPipeClient), } impl Bearer { @@ -124,12 +125,12 @@ impl Bearer { Ok((Self::Unix(stream), addr)) } - // #[cfg(windows)] - // pub fn connect_named_pipe(pipe_name: impl AsRef) -> - // IOResult { let client = - // tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name)?; - // Ok(Self::NamedPipe(client)) - // } + #[cfg(windows)] + pub fn connect_named_pipe(pipe_name: impl AsRef) -> + IOResult { let client = + tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name)?; + Ok(Self::NamedPipe(client)) + } pub fn into_split(self) -> (BearerReadHalf, BearerWriteHalf) { match self { @@ -143,6 +144,15 @@ impl Bearer { let (r, w) = x.into_split(); (BearerReadHalf::Unix(r), BearerWriteHalf::Unix(w)) } + + #[cfg(windows)] + Bearer::NamedPipe(x) => { + let (read, write) = tokio::io::split(x); + let reader = BearerReadHalf::NamedPipe(read); + let writer = BearerWriteHalf::NamedPipe(write); + + (reader, writer) + } } } } @@ -152,6 +162,9 @@ pub enum BearerReadHalf { #[cfg(unix)] Unix(unix::unix::OwnedReadHalf), + + #[cfg(windows)] + NamedPipe(ReadHalf), } impl BearerReadHalf { @@ -161,6 +174,9 @@ impl BearerReadHalf { #[cfg(unix)] BearerReadHalf::Unix(x) => x.read_exact(buf).await, + + #[cfg(windows)] + BearerReadHalf::NamedPipe(x) => x.read_exact(buf).await, } } } @@ -170,6 +186,9 @@ pub enum BearerWriteHalf { #[cfg(unix)] Unix(unix::unix::OwnedWriteHalf), + + #[cfg(windows)] + NamedPipe(WriteHalf), } impl BearerWriteHalf { @@ -179,6 +198,9 @@ impl BearerWriteHalf { #[cfg(unix)] Self::Unix(x) => x.write_all(buf).await, + + #[cfg(windows)] + Self::NamedPipe(x) => x.write_all(buf).await, } } @@ -188,8 +210,9 @@ impl BearerWriteHalf { #[cfg(unix)] Self::Unix(x) => x.flush().await, - //#[cfg(windows)] - //Bearer::NamedPipe(x) => x.flush().await, + + #[cfg(windows)] + Self::NamedPipe(x) => x.flush().await, } } }