feat(network): implement split read / write for NamedPipe bearer (#371)

This commit is contained in:
Clark Alesna 2024-01-03 19:34:16 +08:00 committed by GitHub
parent 57d84fe1e9
commit 1e7407867f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 43 deletions

View file

@ -1,11 +1,11 @@
use pallas::{ use pallas::{
ledger::addresses::Address, ledger::{addresses::Address, traverse::MultiEraBlock},
network::{ network::{
facades::NodeClient, facades::NodeClient,
miniprotocols::{ miniprotocols::{
chainsync, chainsync,
localstate::queries_v16::{self, Addr, Addrs}, localstate::queries_v16::{self, Addr, Addrs},
Point, PRE_PRODUCTION_MAGIC, Point, PRE_PRODUCTION_MAGIC, PREVIEW_MAGIC
}, },
}, },
}; };
@ -66,15 +66,15 @@ async fn do_chainsync(client: &mut NodeClient) {
info!("intersected point is {:?}", point); info!("intersected point is {:?}", point);
for _ in 0..10 { loop {
let next = client.chainsync().request_next().await.unwrap(); let next = client.chainsync().request_next().await.unwrap();
match next { match next {
chainsync::NextResponse::RollForward(h, _) => { chainsync::NextResponse::RollForward(h, _) => {
log::info!("rolling forward, block size: {}", h.len()) let block_number = MultiEraBlock::decode(&h).unwrap().number();
info!("rolling forward {}, block size: {}", block_number, h.len())
} }
chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x), chainsync::NextResponse::RollBackward(x, _) => info!("rollback to {:?}", x),
chainsync::NextResponse::Await => log::info!("tip of chain reached"), chainsync::NextResponse::Await => info!("tip of chain reached"),
}; };
} }
} }
@ -82,6 +82,7 @@ async fn do_chainsync(client: &mut NodeClient) {
// change the following to match the Cardano node socket in your local // change the following to match the Cardano node socket in your local
// environment // environment
const SOCKET_PATH: &str = "/tmp/node.socket"; const SOCKET_PATH: &str = "/tmp/node.socket";
const PIPE_NAME: &str = "\\\\.\\pipe\\cardano-pallas";
#[cfg(unix)] #[cfg(unix)]
#[tokio::main] #[tokio::main]
@ -106,7 +107,26 @@ async fn main() {
do_chainsync(&mut client).await; do_chainsync(&mut client).await;
} }
#[cfg(not(target_family = "unix"))] #[cfg(target_family = "windows")]
fn main() { #[tokio::main]
panic!("can't use n2c unix socket on non-unix systems"); async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();
// we connect to the namedpipe of the local node. Make sure you have the right
// path for your environment
let mut client = NodeClient::connect(PIPE_NAME, PREVIEW_MAGIC)
.await
.unwrap();
// execute an arbitrary "Local State" query against the node
do_localstate_query(&mut client).await;
// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut client).await;
} }

View file

@ -255,33 +255,37 @@ impl NodeClient {
Ok(client) Ok(client)
} }
// #[cfg(windows)] #[cfg(windows)]
// pub async fn connect( pub async fn connect(
// pipe_name: impl AsRef<std::ffi::OsStr>, pipe_name: impl AsRef<std::ffi::OsStr>,
// magic: u64, magic: u64,
// ) -> Result<Self, Error> { ) -> Result<Self, Error> {
// let bearer = tokio::task::spawn_blocking(move || let pipe_name = pipe_name.as_ref().to_os_string();
// Bearer::connect_named_pipe(pipe_name)) .await
// .expect("can't join tokio thread")
// .map_err(Error::ConnectFailure)?;
// let mut client = Self::new(bearer); let bearer = tokio::task::spawn_blocking(move || {
Bearer::connect_named_pipe(pipe_name)
})
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;
// let versions = handshake::n2c::VersionTable::v10_and_above(magic); let mut client = Self::new(bearer);
// let handshake = client let versions = handshake::n2c::VersionTable::v10_and_above(magic);
// .handshake()
// .handshake(versions)
// .await
// .map_err(Error::HandshakeProtocol)?;
// if let handshake::Confirmation::Rejected(reason) = handshake { let handshake = client
// error!(?reason, "handshake refused"); .handshake()
// return Err(Error::IncompatibleVersion); .handshake(versions)
// } .await
.map_err(Error::HandshakeProtocol)?;
// Ok(client) if let handshake::Confirmation::Rejected(reason) = handshake {
// } error!(?reason, "handshake refused");
return Err(Error::IncompatibleVersion);
}
Ok(client)
}
#[cfg(unix)] #[cfg(unix)]
pub async fn handshake_query( pub async fn handshake_query(

View file

@ -5,7 +5,7 @@ use std::collections::HashMap;
use byteorder::{ByteOrder, NetworkEndian}; use byteorder::{ByteOrder, NetworkEndian};
use pallas_codec::{minicbor, Fragment}; use pallas_codec::{minicbor, Fragment};
use thiserror::Error; use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::Instant; use tokio::time::Instant;
use tokio::{select, sync::mpsc::error::SendError}; use tokio::{select, sync::mpsc::error::SendError};
@ -71,8 +71,9 @@ pub enum Bearer {
#[cfg(unix)] #[cfg(unix)]
Unix(unix::UnixStream), Unix(unix::UnixStream),
// #[cfg(windows)]
// NamedPipe(NamedPipeClient), #[cfg(windows)]
NamedPipe(NamedPipeClient),
} }
impl Bearer { impl Bearer {
@ -124,12 +125,12 @@ impl Bearer {
Ok((Self::Unix(stream), addr)) Ok((Self::Unix(stream), addr))
} }
// #[cfg(windows)] #[cfg(windows)]
// pub fn connect_named_pipe(pipe_name: impl AsRef<std::ffi::OsStr>) -> pub fn connect_named_pipe(pipe_name: impl AsRef<std::ffi::OsStr>) ->
// IOResult<Self> { let client = IOResult<Self> { let client =
// tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name)?; tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name)?;
// Ok(Self::NamedPipe(client)) Ok(Self::NamedPipe(client))
// } }
pub fn into_split(self) -> (BearerReadHalf, BearerWriteHalf) { pub fn into_split(self) -> (BearerReadHalf, BearerWriteHalf) {
match self { match self {
@ -143,6 +144,15 @@ impl Bearer {
let (r, w) = x.into_split(); let (r, w) = x.into_split();
(BearerReadHalf::Unix(r), BearerWriteHalf::Unix(w)) (BearerReadHalf::Unix(r), BearerWriteHalf::Unix(w))
} }
#[cfg(windows)]
Bearer::NamedPipe(x) => {
let (read, write) = tokio::io::split(x);
let reader = BearerReadHalf::NamedPipe(read);
let writer = BearerWriteHalf::NamedPipe(write);
(reader, writer)
}
} }
} }
} }
@ -152,6 +162,9 @@ pub enum BearerReadHalf {
#[cfg(unix)] #[cfg(unix)]
Unix(unix::unix::OwnedReadHalf), Unix(unix::unix::OwnedReadHalf),
#[cfg(windows)]
NamedPipe(ReadHalf<NamedPipeClient>),
} }
impl BearerReadHalf { impl BearerReadHalf {
@ -161,6 +174,9 @@ impl BearerReadHalf {
#[cfg(unix)] #[cfg(unix)]
BearerReadHalf::Unix(x) => x.read_exact(buf).await, BearerReadHalf::Unix(x) => x.read_exact(buf).await,
#[cfg(windows)]
BearerReadHalf::NamedPipe(x) => x.read_exact(buf).await,
} }
} }
} }
@ -170,6 +186,9 @@ pub enum BearerWriteHalf {
#[cfg(unix)] #[cfg(unix)]
Unix(unix::unix::OwnedWriteHalf), Unix(unix::unix::OwnedWriteHalf),
#[cfg(windows)]
NamedPipe(WriteHalf<NamedPipeClient>),
} }
impl BearerWriteHalf { impl BearerWriteHalf {
@ -179,6 +198,9 @@ impl BearerWriteHalf {
#[cfg(unix)] #[cfg(unix)]
Self::Unix(x) => x.write_all(buf).await, Self::Unix(x) => x.write_all(buf).await,
#[cfg(windows)]
Self::NamedPipe(x) => x.write_all(buf).await,
} }
} }
@ -188,8 +210,9 @@ impl BearerWriteHalf {
#[cfg(unix)] #[cfg(unix)]
Self::Unix(x) => x.flush().await, Self::Unix(x) => x.flush().await,
//#[cfg(windows)]
//Bearer::NamedPipe(x) => x.flush().await, #[cfg(windows)]
Self::NamedPipe(x) => x.flush().await,
} }
} }
} }