From 7f919dcef545ae3483c3f799057eda76b32cd1a6 Mon Sep 17 00:00:00 2001 From: Harper Date: Fri, 1 Sep 2023 13:15:14 +0100 Subject: [PATCH] feat(network): add server-side facades (#282) --- pallas-network/src/facades.rs | 123 +++++++++++++++++- .../src/miniprotocols/handshake/n2c.rs | 2 +- .../src/miniprotocols/handshake/n2n.rs | 2 +- .../src/miniprotocols/handshake/server.rs | 67 +++++++++- pallas-network/src/multiplexer.rs | 12 +- pallas-network/tests/plexer.rs | 2 +- pallas-network/tests/protocols.rs | 39 +----- 7 files changed, 206 insertions(+), 41 deletions(-) diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 31a57c7..a9eba3a 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -1,14 +1,17 @@ use std::path::Path; use thiserror::Error; +use tokio::net::{TcpListener, UnixListener}; use tokio::task::JoinHandle; use tracing::{debug, error}; -use crate::miniprotocols::handshake::Confirmation; +use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber}; +use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE; use crate::{ miniprotocols::{ blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC, - PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, + PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH, + PROTOCOL_N2N_CHAIN_SYNC, }, multiplexer::{self, Bearer}, }; @@ -25,6 +28,7 @@ pub enum Error { IncompatibleVersion, } +/// Client of N2N Ouroboros pub struct PeerClient { plexer_handle: JoinHandle>, pub handshake: handshake::Confirmation, @@ -81,6 +85,64 @@ impl PeerClient { } } +/// Server of N2N Ouroboros +pub struct PeerServer { + plexer_handle: JoinHandle>, + pub version: (VersionNumber, n2n::VersionData), + chainsync: chainsync::N2NServer, + blockfetch: blockfetch::Server, +} + +impl PeerServer { + pub async fn accept(listener: &TcpListener, magic: u64) -> Result { + let (bearer, _) = Bearer::accept_tcp(&listener) + .await + .map_err(Error::ConnectFailure)?; + + let mut server_plexer = multiplexer::Plexer::new(bearer); + + let hs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE); + let cs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC); + let bf_channel = server_plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH); + + let mut server_hs: handshake::Server = handshake::Server::new(hs_channel); + let server_cs = chainsync::N2NServer::new(cs_channel); + let server_bf = blockfetch::Server::new(bf_channel); + + let plexer_handle = tokio::spawn(async move { server_plexer.run().await }); + + let accepted_version = server_hs + .handshake(n2n::VersionTable::v7_and_above(magic)) + .await + .map_err(Error::HandshakeProtocol)?; + + if let Some(ver) = accepted_version { + Ok(Self { + plexer_handle, + version: ver, + chainsync: server_cs, + blockfetch: server_bf, + }) + } else { + plexer_handle.abort(); + Err(Error::IncompatibleVersion) + } + } + + pub fn chainsync(&mut self) -> &mut chainsync::N2NServer { + &mut self.chainsync + } + + pub fn blockfetch(&mut self) -> &mut blockfetch::Server { + &mut self.blockfetch + } + + pub fn abort(&mut self) { + self.plexer_handle.abort(); + } +} + +/// Client of N2C Ouroboros pub struct NodeClient { plexer_handle: JoinHandle>, pub handshake: handshake::Confirmation, @@ -179,3 +241,60 @@ impl NodeClient { self.plexer_handle.abort(); } } + +/// Server of N2C Ouroboros +pub struct NodeServer { + plexer_handle: JoinHandle>, + pub version: (VersionNumber, n2c::VersionData), + chainsync: chainsync::N2CServer, + // statequery: localstate::Server, +} + +impl NodeServer { + pub async fn accept(listener: &UnixListener, magic: u64) -> Result { + let (bearer, _) = Bearer::accept_unix(listener) + .await + .map_err(Error::ConnectFailure)?; + + let mut server_plexer = multiplexer::Plexer::new(bearer); + + let hs_channel = server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE); + let cs_channel = server_plexer.subscribe_server(PROTOCOL_N2C_CHAIN_SYNC); + // let sq_channel = server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY); + + let mut server_hs: handshake::Server = handshake::Server::new(hs_channel); + let server_cs = chainsync::N2CServer::new(cs_channel); + // let server_sq = localstate::Server::new(sq_channel); + + let plexer_handle = tokio::spawn(async move { server_plexer.run().await }); + + let accepted_version = server_hs + .handshake(n2c::VersionTable::v10_and_above(magic)) + .await + .map_err(Error::HandshakeProtocol)?; + + if let Some(ver) = accepted_version { + Ok(Self { + plexer_handle, + version: ver, + chainsync: server_cs, + // statequery: server_sq + }) + } else { + plexer_handle.abort(); + Err(Error::IncompatibleVersion) + } + } + + pub fn chainsync(&mut self) -> &mut chainsync::N2CServer { + &mut self.chainsync + } + + // pub fn statequery(&mut self) -> &mut localstate::Server { + // &mut self.statequery + // } + + pub fn abort(&mut self) { + self.plexer_handle.abort(); + } +} diff --git a/pallas-network/src/miniprotocols/handshake/n2c.rs b/pallas-network/src/miniprotocols/handshake/n2c.rs index 16cac23..5d7a0bd 100644 --- a/pallas-network/src/miniprotocols/handshake/n2c.rs +++ b/pallas-network/src/miniprotocols/handshake/n2c.rs @@ -80,7 +80,7 @@ impl VersionTable { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct VersionData(NetworkMagic, Option); impl Encode<()> for VersionData { diff --git a/pallas-network/src/miniprotocols/handshake/n2n.rs b/pallas-network/src/miniprotocols/handshake/n2n.rs index e9cad83..b584123 100644 --- a/pallas-network/src/miniprotocols/handshake/n2n.rs +++ b/pallas-network/src/miniprotocols/handshake/n2n.rs @@ -57,7 +57,7 @@ impl VersionTable { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct VersionData { network_magic: u64, initiator_and_responder_diffusion_mode: bool, diff --git a/pallas-network/src/miniprotocols/handshake/server.rs b/pallas-network/src/miniprotocols/handshake/server.rs index cd05a5b..c06b9ce 100644 --- a/pallas-network/src/miniprotocols/handshake/server.rs +++ b/pallas-network/src/miniprotocols/handshake/server.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use pallas_codec::Fragment; +use tracing::{debug, warn}; use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable}; use crate::multiplexer; @@ -9,7 +10,7 @@ pub struct Server(State, multiplexer::ChannelBuffer, PhantomData); impl Server where - D: std::fmt::Debug + Clone, + D: std::fmt::Debug + Clone + std::cmp::PartialEq, Message: Fragment, { pub fn new(channel: multiplexer::AgentChannel) -> Self { @@ -109,6 +110,70 @@ where Ok(()) } + /// Perform a handshake with the client + /// + /// Performs a full handshake with the client, where `versions` are the + /// acceptable versions supported by the server. + pub async fn handshake( + &mut self, + versions: VersionTable, + ) -> Result, Error> { + // receive proposed versions + let client_versions = self + .receive_proposed_versions() + .await? + .values + .into_iter() + .collect::>(); + + // find highest intersect with our version table (TODO: improve) + let mut versions = versions.values.into_iter().collect::>(); + + versions.sort_by(|a, b| b.0.cmp(&a.0)); + + for (ver_num, ver_data) in versions.clone() { + for (client_ver_num, client_ver_data) in client_versions.clone() { + if ver_num == client_ver_num { + if ver_data == client_ver_data { + // found a version number and extra data match + debug!("accepting hs with ({}, {:?})", ver_num, ver_data); + + self.accept_version(ver_num, ver_data.clone()).await?; + + return Ok(Some((ver_num, ver_data))); + } else { + warn!( + "rejecting hs as params not acceptable - server: {:?}, client: {:?}", + ver_data, client_ver_data + ); + + // found version number match but extra data not acceptable + self.refuse(RefuseReason::Refused( + ver_num, + "Proposed extra params don't match".into(), + )) + .await?; + + return Ok(None); + } + } + } + } + + warn!( + "rejecting hs as no version intersect found - server: {:?}, client: {:?}", + versions, client_versions + ); + + // failed to find a version number intersection + self.refuse(RefuseReason::VersionMismatch( + versions.into_iter().map(|(num, _)| num).collect(), + )) + .await?; + + Ok(None) + } + pub fn unwrap(self) -> multiplexer::AgentChannel { self.1.unwrap() } diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 1262285..73b505d 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -6,7 +6,7 @@ use std::net::SocketAddr; use std::path::Path; use thiserror::Error; use tokio::io::AsyncWriteExt; -use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixListener}; use tokio::select; use tokio::sync::mpsc::error::SendError; use tokio::time::Instant; @@ -79,11 +79,19 @@ impl Bearer { Ok(Self::Tcp(stream)) } - pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { + pub async fn accept_tcp(listener: &TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { let (stream, addr) = listener.accept().await?; Ok((Self::Tcp(stream), addr)) } + #[cfg(not(target_os = "windows"))] + pub async fn accept_unix( + listener: &UnixListener, + ) -> tokio::io::Result<(Self, tokio::net::unix::SocketAddr)> { + let (stream, addr) = listener.accept().await?; + Ok((Self::Unix(stream), addr)) + } + #[cfg(not(target_os = "windows"))] pub async fn connect_unix(path: impl AsRef) -> Result { let stream = UnixStream::connect(path).await?; diff --git a/pallas-network/tests/plexer.rs b/pallas-network/tests/plexer.rs index 2f03d96..f31e95f 100644 --- a/pallas-network/tests/plexer.rs +++ b/pallas-network/tests/plexer.rs @@ -11,7 +11,7 @@ async fn setup_passive_muxer() -> Plexer { println!("listening for connections on port {}", P); - let (bearer, _) = Bearer::accept_tcp(server).await.unwrap(); + let (bearer, _) = Bearer::accept_tcp(&server).await.unwrap(); Plexer::new(bearer) } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index eb92f80..98e9ae1 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -1,17 +1,14 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use std::time::Duration; -use pallas_network::facades::PeerClient; +use pallas_network::facades::{PeerClient, PeerServer}; use pallas_network::miniprotocols::blockfetch::BlockRequest; use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; -use pallas_network::miniprotocols::handshake; -use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, Point, }; -use pallas_network::multiplexer::{Bearer, Plexer}; use tokio::net::TcpListener; #[tokio::test] @@ -175,21 +172,9 @@ pub async fn blockfetch_server_and_client_happy_path() { .await .unwrap(); - let (bearer, _) = Bearer::accept_tcp(server_listener).await.unwrap(); + let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); - let mut server_plexer = Plexer::new(bearer); - - let mut server_hs: handshake::Server = - handshake::Server::new(server_plexer.subscribe_server(0)); - let mut server_bf = blockfetch::Server::new(server_plexer.subscribe_server(3)); - - tokio::spawn(async move { server_plexer.run().await }); - - server_hs.receive_proposed_versions().await.unwrap(); - server_hs - .accept_version(10, VersionData::new(0, false)) - .await - .unwrap(); + let server_bf = peer_server.blockfetch(); // server receives range from client, sends blocks @@ -278,21 +263,9 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { .await .unwrap(); - let (bearer, _) = Bearer::accept_tcp(server_listener).await.unwrap(); + let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); - let mut server_plexer = Plexer::new(bearer); - - let mut server_hs: handshake::Server = - handshake::Server::new(server_plexer.subscribe_server(0)); - let mut server_cs = chainsync::N2NServer::new(server_plexer.subscribe_server(2)); - - tokio::spawn(async move { server_plexer.run().await }); - - server_hs.receive_proposed_versions().await.unwrap(); - server_hs - .accept_version(10, VersionData::new(0, false)) - .await - .unwrap(); + let server_cs = peer_server.chainsync(); // server receives find intersect from client, sends intersect point @@ -386,7 +359,7 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { }); let client = tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(1)).await; // client setup