From a6a6ffcfed7030be5f947c93c2d0d7d489a73866 Mon Sep 17 00:00:00 2001 From: Harper <25673452+jmhrpr@users.noreply.github.com> Date: Mon, 8 Jan 2024 10:50:37 +0000 Subject: [PATCH] feat(network): implement server side KeepAlive (#376) --- examples/n2n-miniprotocols/src/main.rs | 2 +- pallas-network/src/facades.rs | 18 ++- .../src/miniprotocols/keepalive/client.rs | 37 ++--- .../src/miniprotocols/keepalive/mod.rs | 2 + .../src/miniprotocols/keepalive/server.rs | 129 ++++++++++++++++++ 5 files changed, 165 insertions(+), 23 deletions(-) create mode 100644 pallas-network/src/miniprotocols/keepalive/server.rs diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 6ff26cb..dfb950f 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -21,7 +21,7 @@ pub enum Error { ChainSyncError(#[from] chainsync::ClientError), #[error("keepalive error")] - KeepAliveError(#[from] keepalive::Error), + KeepAliveError(#[from] keepalive::ClientError), #[error("pallas_traverse error")] PallasTraverseError(#[from] pallas::ledger::traverse::Error), diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index cec5992..06df821 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -125,11 +125,12 @@ impl PeerClient { /// Server of N2N Ouroboros pub struct PeerServer { - plexer: RunningPlexer, - handshake: handshake::N2NServer, - chainsync: chainsync::N2NServer, - blockfetch: blockfetch::Server, - txsubmission: txsubmission::Server, + pub plexer: RunningPlexer, + pub handshake: handshake::N2NServer, + pub chainsync: chainsync::N2NServer, + pub blockfetch: blockfetch::Server, + pub txsubmission: txsubmission::Server, + pub keepalive: keepalive::Server, accepted_address: Option, accepted_version: Option, } @@ -142,11 +143,13 @@ impl PeerServer { let cs_channel = plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC); let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH); let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION); + let keepalive_channel = plexer.subscribe_server(PROTOCOL_N2N_KEEP_ALIVE); let hs = handshake::N2NServer::new(hs_channel); let cs = chainsync::N2NServer::new(cs_channel); let bf = blockfetch::Server::new(bf_channel); let txsub = txsubmission::Server::new(txsub_channel); + let keepalive = keepalive::Server::new(keepalive_channel); let plexer = plexer.spawn(); @@ -156,6 +159,7 @@ impl PeerServer { chainsync: cs, blockfetch: bf, txsubmission: txsub, + keepalive, accepted_address: None, accepted_version: None, } @@ -200,6 +204,10 @@ impl PeerServer { &mut self.txsubmission } + pub fn keepalive(&mut self) -> &mut keepalive::Server { + &mut self.keepalive + } + pub async fn abort(self) { self.plexer.abort().await } diff --git a/pallas-network/src/miniprotocols/keepalive/client.rs b/pallas-network/src/miniprotocols/keepalive/client.rs index 0143507..8c4e35d 100644 --- a/pallas-network/src/miniprotocols/keepalive/client.rs +++ b/pallas-network/src/miniprotocols/keepalive/client.rs @@ -7,7 +7,7 @@ use super::protocol::*; use crate::multiplexer; #[derive(Error, Debug)] -pub enum Error { +pub enum ClientError { #[error("attempted to receive message while agency is ours")] AgencyIsOurs, @@ -58,54 +58,57 @@ impl Client { } } - fn assert_agency_is_ours(&self) -> Result<(), Error> { + fn assert_agency_is_ours(&self) -> Result<(), ClientError> { if !self.has_agency() { - Err(Error::AgencyIsTheirs) + Err(ClientError::AgencyIsTheirs) } else { Ok(()) } } - fn assert_agency_is_theirs(&self) -> Result<(), Error> { + fn assert_agency_is_theirs(&self) -> Result<(), ClientError> { if self.has_agency() { - Err(Error::AgencyIsOurs) + Err(ClientError::AgencyIsOurs) } else { Ok(()) } } - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> { match (&self.0, msg) { (State::Client, Message::KeepAlive(..)) => Ok(()), (State::Client, Message::Done) => Ok(()), - _ => Err(Error::InvalidOutbound), + _ => Err(ClientError::InvalidOutbound), } } - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> { match (&self.0, msg) { (State::Server, Message::ResponseKeepAlive(..)) => Ok(()), - _ => Err(Error::InvalidInbound), + _ => Err(ClientError::InvalidInbound), } } - pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; - self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?; + self.1 + .send_msg_chunks(msg) + .await + .map_err(ClientError::Plexer)?; Ok(()) } - pub async fn recv_message(&mut self) -> Result { + pub async fn recv_message(&mut self) -> Result { self.assert_agency_is_theirs()?; - let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?; + let msg = self.1.recv_full_msg().await.map_err(ClientError::Plexer)?; self.assert_inbound_state(&msg)?; Ok(msg) } - pub async fn send_keepalive(&mut self) -> Result<(), Error> { + pub async fn send_keepalive(&mut self) -> Result<(), ClientError> { // generate random cookie value let cookie = rand::thread_rng().gen::(); let msg = Message::KeepAlive(cookie); @@ -119,7 +122,7 @@ impl Client { Ok(()) } - async fn recv_while_sending_keepalive(&mut self) -> Result<(), Error> { + async fn recv_while_sending_keepalive(&mut self) -> Result<(), ClientError> { match self.recv_message().await? { Message::ResponseKeepAlive(cookie) => { debug!("received keepalive response with cookie {}", cookie); @@ -127,10 +130,10 @@ impl Client { self.0 = State::Client; Ok(()) } else { - Err(Error::KeepAliveCookieMismatch) + Err(ClientError::KeepAliveCookieMismatch) } } - _ => Err(Error::InvalidInbound), + _ => Err(ClientError::InvalidInbound), } } } diff --git a/pallas-network/src/miniprotocols/keepalive/mod.rs b/pallas-network/src/miniprotocols/keepalive/mod.rs index 9bbd8cc..bc68dde 100644 --- a/pallas-network/src/miniprotocols/keepalive/mod.rs +++ b/pallas-network/src/miniprotocols/keepalive/mod.rs @@ -1,6 +1,8 @@ mod client; mod codec; mod protocol; +mod server; pub use client::*; pub use protocol::*; +pub use server::*; diff --git a/pallas-network/src/miniprotocols/keepalive/server.rs b/pallas-network/src/miniprotocols/keepalive/server.rs new file mode 100644 index 0000000..eb354ea --- /dev/null +++ b/pallas-network/src/miniprotocols/keepalive/server.rs @@ -0,0 +1,129 @@ +use std::fmt::Debug; +use thiserror::*; +use tracing::debug; + +use super::protocol::*; +use crate::multiplexer; + +#[derive(Error, Debug)] +pub enum ServerError { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("error while sending or receiving data through the channel")] + Plexer(multiplexer::Error), +} + +pub struct Server(State, multiplexer::ChannelBuffer); + +impl Server { + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Client, multiplexer::ChannelBuffer::new(channel)) + } + + pub fn state(&self) -> &State { + &self.0 + } + + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + fn has_agency(&self) -> bool { + match &self.0 { + State::Client => false, + State::Server => true, + State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), ServerError> { + if !self.has_agency() { + Err(ServerError::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), ServerError> { + if self.has_agency() { + Err(ServerError::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), ServerError> { + match (&self.0, msg) { + (State::Server, Message::ResponseKeepAlive(..)) => Ok(()), + + _ => Err(ServerError::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), ServerError> { + match (&self.0, msg) { + (State::Client, Message::KeepAlive(..)) => Ok(()), + (State::Client, Message::Done) => Ok(()), + _ => Err(ServerError::InvalidInbound), + } + } + + pub async fn send_message(&mut self, msg: &Message) -> Result<(), ServerError> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1 + .send_msg_chunks(msg) + .await + .map_err(ServerError::Plexer)?; + + Ok(()) + } + + pub async fn recv_message(&mut self) -> Result { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().await.map_err(ServerError::Plexer)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub async fn send_keepalive_response( + &mut self, + cookie: KeepAliveCookie, + ) -> Result<(), ServerError> { + let msg = Message::ResponseKeepAlive(cookie); + self.send_message(&msg).await?; + self.0 = State::Client; + debug!("sent keepalive response message with cookie {}", cookie); + + Ok(()) + } + + pub async fn keepalive_receive_and_respond(&mut self) -> Result, ServerError> { + match self.recv_message().await? { + Message::KeepAlive(cookie) => { + debug!("received keepalive message with cookie {}", cookie); + + self.0 = State::Server; + Some(self.send_keepalive_response(cookie).await).transpose() + } + Message::Done => { + debug!("client sent done message in keepalive protocol"); + + self.0 = State::Done; + Ok(None) + } + _ => Err(ServerError::InvalidInbound), + } + } +}