From 1ed2161a43a6b066d3138d9865f82e4ff30c9398 Mon Sep 17 00:00:00 2001 From: Andrew Westberg Date: Tue, 19 Dec 2023 05:56:10 -0500 Subject: [PATCH] feat(network): update n2n handshake versions & add keepalive miniprotocol (#362) --- .gitignore | 1 + examples/n2n-miniprotocols/src/main.rs | 6 + pallas-network/Cargo.toml | 2 +- pallas-network/src/facades.rs | 21 +-- .../src/miniprotocols/handshake/n2n.rs | 99 +++++++++---- .../src/miniprotocols/keepalive/client.rs | 132 ++++++++++++++++++ .../src/miniprotocols/keepalive/codec.rs | 49 +++++++ .../src/miniprotocols/keepalive/mod.rs | 7 + .../src/miniprotocols/keepalive/protocol.rs | 15 ++ pallas-network/src/miniprotocols/mod.rs | 1 + pallas-network/src/multiplexer.rs | 8 ++ pallas-network/tests/protocols.rs | 2 +- 12 files changed, 302 insertions(+), 41 deletions(-) create mode 100644 pallas-network/src/miniprotocols/keepalive/client.rs create mode 100644 pallas-network/src/miniprotocols/keepalive/codec.rs create mode 100644 pallas-network/src/miniprotocols/keepalive/mod.rs create mode 100644 pallas-network/src/miniprotocols/keepalive/protocol.rs diff --git a/.gitignore b/.gitignore index 0fef8b6..92688d4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ scratchpad .DS_Store RELEASE.md .idea +.vscode diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 4a7fbc6..f3d06d9 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -2,6 +2,7 @@ use pallas::network::{ facades::PeerClient, miniprotocols::{chainsync, Point, MAINNET_MAGIC}, }; +use tokio::time::Instant; use tracing::info; async fn do_blockfetch(peer: &mut PeerClient) { @@ -35,7 +36,12 @@ async fn do_chainsync(peer: &mut PeerClient) { info!("intersected point is {:?}", point); + let mut keepalive_timer = Instant::now(); for _ in 0..10 { + if keepalive_timer.elapsed().as_secs() > 20 { + peer.keepalive().send_keepalive().await.unwrap(); + keepalive_timer = Instant::now(); + } let next = peer.chainsync().request_next().await.unwrap(); match next { diff --git a/pallas-network/Cargo.toml b/pallas-network/Cargo.toml index 73b74b4..381d33e 100644 --- a/pallas-network/Cargo.toml +++ b/pallas-network/Cargo.toml @@ -16,6 +16,7 @@ hex = "0.4.3" itertools = "0.10.5" pallas-codec = { version = "=0.20.0", path = "../pallas-codec" } pallas-crypto = { version = "=0.20.0", path = "../pallas-crypto" } +rand = "0.8.5" thiserror = "1.0.31" tokio = { version = "1", features = ["rt", "net", "io-util", "time", "sync", "macros"] } tracing = "0.1.37" @@ -23,5 +24,4 @@ tracing = "0.1.37" [dev-dependencies] tracing-subscriber = "0.3.16" tokio = { version = "1", features = ["full"] } -rand = "0.8.5" diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index ddbee18..31d3412 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -10,15 +10,13 @@ use tokio::net::UnixListener; use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable}; -use crate::miniprotocols::{txsubmission, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION}; -use crate::{ - miniprotocols::{ - blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC, - PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH, - PROTOCOL_N2N_CHAIN_SYNC, - }, - multiplexer::{self, Bearer}, +use crate::miniprotocols::{ + txsubmission, keepalive, blockfetch, chainsync, handshake, localstate, + PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_TX_SUBMISSION, PROTOCOL_N2N_KEEP_ALIVE, + PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, + PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, }; +use crate::multiplexer::{self, Bearer}; #[derive(Debug, Error)] pub enum Error { @@ -39,6 +37,7 @@ pub struct PeerClient { pub chainsync: chainsync::N2NClient, pub blockfetch: blockfetch::Client, pub txsubmission: txsubmission::Client, + pub keepalive: keepalive::Client, } impl PeerClient { @@ -54,6 +53,7 @@ impl PeerClient { let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC); let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH); let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION); + let keepalive_channel = plexer.subscribe_client(PROTOCOL_N2N_KEEP_ALIVE); let plexer_handle = tokio::spawn(async move { plexer.run().await }); @@ -76,6 +76,7 @@ impl PeerClient { chainsync: chainsync::Client::new(cs_channel), blockfetch: blockfetch::Client::new(bf_channel), txsubmission: txsubmission::Client::new(txsub_channel), + keepalive: keepalive::Client::new(keepalive_channel), }) } @@ -91,6 +92,10 @@ impl PeerClient { &mut self.txsubmission } + pub fn keepalive(&mut self) -> &mut keepalive::Client { + &mut self.keepalive + } + pub fn abort(&mut self) { self.plexer_handle.abort(); } diff --git a/pallas-network/src/miniprotocols/handshake/n2n.rs b/pallas-network/src/miniprotocols/handshake/n2n.rs index b584123..ad7bc17 100644 --- a/pallas-network/src/miniprotocols/handshake/n2n.rs +++ b/pallas-network/src/miniprotocols/handshake/n2n.rs @@ -1,41 +1,38 @@ use std::collections::HashMap; -use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; +use pallas_codec::minicbor::{decode, Decode, Decoder, encode, Encode, Encoder}; pub type VersionTable = super::protocol::VersionTable; -const PROTOCOL_V4: u64 = 4; -const PROTOCOL_V5: u64 = 5; -const PROTOCOL_V6: u64 = 6; const PROTOCOL_V7: u64 = 7; const PROTOCOL_V8: u64 = 8; const PROTOCOL_V9: u64 = 9; const PROTOCOL_V10: u64 = 10; +const PROTOCOL_V11: u64 = 11; +const PROTOCOL_V12: u64 = 12; +const PROTOCOL_V13: u64 = 13; + +const PEER_SHARING_DISABLED: u8 = 0; impl VersionTable { pub fn v4_and_above(network_magic: u64) -> VersionTable { - let values = vec![ - (PROTOCOL_V4, VersionData::new(network_magic, false)), - (PROTOCOL_V5, VersionData::new(network_magic, false)), - (PROTOCOL_V6, VersionData::new(network_magic, false)), - (PROTOCOL_V7, VersionData::new(network_magic, false)), - (PROTOCOL_V8, VersionData::new(network_magic, false)), - (PROTOCOL_V9, VersionData::new(network_magic, false)), - (PROTOCOL_V10, VersionData::new(network_magic, false)), - ] - .into_iter() - .collect::>(); - - VersionTable { values } + // Older versions are not supported anymore (removed from network-spec.pdf). + // Try not to break compatibility with older pallas users. + return Self::v7_and_above(network_magic); } pub fn v6_and_above(network_magic: u64) -> VersionTable { + // Older versions are not supported anymore (removed from network-spec.pdf). + // Try not to break compatibility with older pallas users. + return Self::v7_and_above(network_magic); + } + + pub fn v7_to_v10(network_magic: u64) -> VersionTable { let values = vec![ - (PROTOCOL_V6, VersionData::new(network_magic, false)), - (PROTOCOL_V7, VersionData::new(network_magic, false)), - (PROTOCOL_V8, VersionData::new(network_magic, false)), - (PROTOCOL_V9, VersionData::new(network_magic, false)), - (PROTOCOL_V10, VersionData::new(network_magic, false)), + (PROTOCOL_V7, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V8, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V9, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V10, VersionData::new(network_magic, false, None, None)), ] .into_iter() .collect::>(); @@ -45,10 +42,25 @@ impl VersionTable { pub fn v7_and_above(network_magic: u64) -> VersionTable { let values = vec![ - (PROTOCOL_V7, VersionData::new(network_magic, false)), - (PROTOCOL_V8, VersionData::new(network_magic, false)), - (PROTOCOL_V9, VersionData::new(network_magic, false)), - (PROTOCOL_V10, VersionData::new(network_magic, false)), + (PROTOCOL_V7, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V8, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V9, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V10, VersionData::new(network_magic, false, None, None)), + (PROTOCOL_V11, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V12, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V13, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + ] + .into_iter() + .collect::>(); + + VersionTable { values } + } + + pub fn v11_and_above(network_magic: u64) -> VersionTable { + let values = vec![ + (PROTOCOL_V11, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V12, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), + (PROTOCOL_V13, VersionData::new(network_magic, false, Some(PEER_SHARING_DISABLED), Some(false))), ] .into_iter() .collect::>(); @@ -61,13 +73,17 @@ impl VersionTable { pub struct VersionData { network_magic: u64, initiator_and_responder_diffusion_mode: bool, + peer_sharing: Option, + query: Option, } impl VersionData { - pub fn new(network_magic: u64, initiator_and_responder_diffusion_mode: bool) -> Self { + pub fn new(network_magic: u64, initiator_and_responder_diffusion_mode: bool, peer_sharing: Option, query: Option) -> Self { VersionData { network_magic, initiator_and_responder_diffusion_mode, + peer_sharing, + query, } } } @@ -78,9 +94,20 @@ impl Encode<()> for VersionData { e: &mut Encoder, _ctx: &mut (), ) -> Result<(), encode::Error> { - e.array(2)? - .u64(self.network_magic)? - .bool(self.initiator_and_responder_diffusion_mode)?; + match (self.peer_sharing, self.query) { + (Some(peer_sharing), Some(query)) => { + e.array(4)? + .u64(self.network_magic)? + .bool(self.initiator_and_responder_diffusion_mode)? + .u8(peer_sharing)? + .bool(query)?; + }, + _ => { + e.array(2)? + .u64(self.network_magic)? + .bool(self.initiator_and_responder_diffusion_mode)?; + }, + }; Ok(()) } @@ -88,13 +115,23 @@ impl Encode<()> for VersionData { impl<'b> Decode<'b, ()> for VersionData { fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - d.array()?; + let len = d.array()?; let network_magic = d.u64()?; let initiator_and_responder_diffusion_mode = d.bool()?; + let peer_sharing = match len { + Some(4) => Some(d.u8()?), + _ => None, + }; + let query = match len { + Some(4) => Some(d.bool()?), + _ => None, + }; Ok(Self { network_magic, initiator_and_responder_diffusion_mode, + peer_sharing, + query, }) } } diff --git a/pallas-network/src/miniprotocols/keepalive/client.rs b/pallas-network/src/miniprotocols/keepalive/client.rs new file mode 100644 index 0000000..45aa48b --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/client.rs @@ -0,0 +1,132 @@ +use std::fmt::Debug; +use rand::Rng; +use thiserror::*; +use tracing::debug; + +use super::protocol::*; +use crate::multiplexer; + +#[derive(Error, Debug)] +pub enum Error { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("keepalive cookie mismatch")] + KeepAliveCookieMismatch, + + #[error("error while sending or receiving data through the channel")] + Plexer(multiplexer::Error), +} + +pub struct KeepAliveSharedState { + saved_cookie: u16, +} + +pub struct Client(State, multiplexer::ChannelBuffer, KeepAliveSharedState); + +impl Client { + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Client, multiplexer::ChannelBuffer::new(channel), KeepAliveSharedState{ saved_cookie: 0 }) + } + + pub fn state(&self) -> &State { + &self.0 + } + + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + fn has_agency(&self) -> bool { + match &self.0 { + State::Client => true, + State::Server => false, + State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), Error> { + if !self.has_agency() { + Err(Error::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), Error> { + if self.has_agency() { + Err(Error::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Client, Message::KeepAlive(..)) => Ok(()), + (State::Client, Message::Done) => Ok(()), + _ => Err(Error::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Server, Message::ResponseKeepAlive(..)) => Ok(()), + _ => Err(Error::InvalidInbound), + } + } + + pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?; + + Ok(()) + } + + pub async fn recv_message(&mut self) -> Result { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub async fn send_keepalive(&mut self) -> Result<(), Error> { + // generate random cookie value + let cookie = rand::thread_rng().gen::(); + let msg = Message::KeepAlive(cookie); + self.send_message(&msg).await?; + self.2.saved_cookie = cookie; + self.0 = State::Server; + debug!("sent keepalive message with cookie {}", cookie); + + self.recv_while_sending_keepalive().await?; + + Ok(()) + } + + async fn recv_while_sending_keepalive(&mut self) -> Result<(), Error> { + match self.recv_message().await? { + Message::ResponseKeepAlive(cookie) => { + debug!("received keepalive response with cookie {}", cookie); + if cookie == self.2.saved_cookie { + self.0 = State::Client; + Ok(()) + } else { + Err(Error::KeepAliveCookieMismatch) + } + } + _ => Err(Error::InvalidInbound), + } + } +} diff --git a/pallas-network/src/miniprotocols/keepalive/codec.rs b/pallas-network/src/miniprotocols/keepalive/codec.rs new file mode 100644 index 0000000..e00ed9b --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/codec.rs @@ -0,0 +1,49 @@ +use super::protocol::*; +use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; + +impl Encode<()> for Message { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Message::KeepAlive(cookie) => { + e.array(2)?.u16(0)?; + e.encode(cookie)?; + }, + Message::ResponseKeepAlive(cookie) => { + e.array(2)?.u16(1)?; + e.encode(cookie)?; + }, + Message::Done => { + e.array(1)?.u16(2)?; + }, + } + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for Message { + fn decode( + d: &mut pallas_codec::minicbor::Decoder<'b>, + _ctx: &mut (), + ) -> Result { + d.array()?; + let label = d.u16()?; + + match label { + 0 => { + let cookie = d.decode()?; + Ok(Message::KeepAlive(cookie)) + } + 1 => { + let cookie = d.decode()?; + Ok(Message::ResponseKeepAlive(cookie)) + } + 2 => Ok(Message::Done), + _ => Err(decode::Error::message("can't decode Message")), + } + } +} diff --git a/pallas-network/src/miniprotocols/keepalive/mod.rs b/pallas-network/src/miniprotocols/keepalive/mod.rs new file mode 100644 index 0000000..a9eaa04 --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/mod.rs @@ -0,0 +1,7 @@ +mod client; +mod codec; +mod protocol; + +pub use client::*; +pub use codec::*; +pub use protocol::*; diff --git a/pallas-network/src/miniprotocols/keepalive/protocol.rs b/pallas-network/src/miniprotocols/keepalive/protocol.rs new file mode 100644 index 0000000..121228c --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/protocol.rs @@ -0,0 +1,15 @@ +pub type KeepAliveCookie = u16; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum State { + Client, + Server, + Done, +} + +#[derive(Debug, Clone)] +pub enum Message { + KeepAlive(KeepAliveCookie), + ResponseKeepAlive(KeepAliveCookie), + Done, +} diff --git a/pallas-network/src/miniprotocols/mod.rs b/pallas-network/src/miniprotocols/mod.rs index 6c452db..dbbd59c 100644 --- a/pallas-network/src/miniprotocols/mod.rs +++ b/pallas-network/src/miniprotocols/mod.rs @@ -5,6 +5,7 @@ mod common; pub mod blockfetch; pub mod chainsync; pub mod handshake; +pub mod keepalive; pub mod localstate; pub mod localtxsubmission; pub mod txmonitor; diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index f848956..acbc375 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -80,6 +80,14 @@ impl Bearer { Ok(Self::Tcp(stream)) } + pub async fn connect_tcp_timeout(addr: impl ToSocketAddrs, timeout: std::time::Duration) -> Result { + match tokio::time::timeout(timeout, Self::connect_tcp(addr)).await { + Ok(Ok(stream)) => Ok(stream), + Ok(Err(err)) => Err(err), + Err(_) => Err(tokio::io::Error::new(tokio::io::ErrorKind::TimedOut, "connection timed out")), + } + } + pub async fn accept_tcp(listener: &TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { let (stream, addr) = listener.accept().await?; Ok((Self::Tcp(stream), addr)) diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 576566e..b6b401a 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -285,7 +285,7 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { server_hs.receive_proposed_versions().await.unwrap(); server_hs - .accept_version(10, VersionData::new(0, false)) + .accept_version(10, VersionData::new(0, false, None, None)) .await .unwrap();