From 9ea5d7cefe26743b3ab38412946d7c4649903054 Mon Sep 17 00:00:00 2001 From: Harper Date: Wed, 9 Aug 2023 10:46:45 +0100 Subject: [PATCH] feat(network): add server side of blockfetch miniprotocol (#275) --- .../src/miniprotocols/blockfetch/mod.rs | 2 + .../src/miniprotocols/blockfetch/server.rs | 190 ++++++++++++++++++ pallas-network/tests/protocols.rs | 118 +++++++++++ 3 files changed, 310 insertions(+) create mode 100644 pallas-network/src/miniprotocols/blockfetch/server.rs diff --git a/pallas-network/src/miniprotocols/blockfetch/mod.rs b/pallas-network/src/miniprotocols/blockfetch/mod.rs index 6326235..26cb050 100644 --- a/pallas-network/src/miniprotocols/blockfetch/mod.rs +++ b/pallas-network/src/miniprotocols/blockfetch/mod.rs @@ -3,7 +3,9 @@ mod client; mod codec; mod protocol; +mod server; pub use client::*; pub use codec::*; pub use protocol::*; +pub use server::*; diff --git a/pallas-network/src/miniprotocols/blockfetch/server.rs b/pallas-network/src/miniprotocols/blockfetch/server.rs new file mode 100644 index 0000000..d346a3b --- /dev/null +++ b/pallas-network/src/miniprotocols/blockfetch/server.rs @@ -0,0 +1,190 @@ +use thiserror::Error; + +use crate::multiplexer; + +use super::{Body, Message, Range, State}; + +#[derive(Error, Debug)] +pub enum Error { + #[error("attempted to receive message while agency is ours")] + AgencyIsOurs, + + #[error("attempted to send message while agency is theirs")] + AgencyIsTheirs, + + #[error("inbound message is not valid for current state")] + InvalidInbound, + + #[error("outbound message is not valid for current state")] + InvalidOutbound, + + #[error("error while sending or receiving data through the multiplexer")] + Plexer(multiplexer::Error), +} + +#[derive(Debug)] +pub struct BlockRequest(pub Range); + +/// Represents the server for the BlockFetch mini-protocol. +pub struct Server(State, multiplexer::ChannelBuffer); + +impl Server { + /// Create a new BlockFetch server from a multiplexer agent channel. + /// + /// # Arguments + /// + /// * `channel` - A multiplexer agent channel used for communication with + /// the server. + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) + } + + /// Get the current state of the server. + /// + /// Returns the current state of the server. + pub fn state(&self) -> &State { + &self.0 + } + + /// Check if the server is done. + /// + /// Returns true if server is in the `Done` state, false otherwise. + pub fn is_done(&self) -> bool { + self.0 == State::Done + } + + fn has_agency(&self) -> bool { + match self.state() { + State::Idle => false, + State::Busy => true, + State::Streaming => true, + State::Done => false, + } + } + + fn assert_agency_is_ours(&self) -> Result<(), Error> { + if !self.has_agency() { + Err(Error::AgencyIsTheirs) + } else { + Ok(()) + } + } + + fn assert_agency_is_theirs(&self) -> Result<(), Error> { + if self.has_agency() { + Err(Error::AgencyIsOurs) + } else { + Ok(()) + } + } + + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Busy, Message::NoBlocks) => Ok(()), + (State::Busy, Message::StartBatch) => Ok(()), + (State::Streaming, Message::Block { .. }) => Ok(()), + (State::Streaming, Message::BatchDone) => Ok(()), + _ => Err(Error::InvalidOutbound), + } + } + + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + match (&self.0, msg) { + (State::Idle, Message::RequestRange { .. }) => Ok(()), + (State::Idle, Message::ClientDone) => Ok(()), + _ => Err(Error::InvalidInbound), + } + } + + pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + self.assert_agency_is_ours()?; + self.assert_outbound_state(msg)?; + self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?; + + Ok(()) + } + + pub async fn recv_message(&mut self) -> Result { + self.assert_agency_is_theirs()?; + let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?; + self.assert_inbound_state(&msg)?; + + Ok(msg) + } + + pub async fn send_start_batch(&mut self) -> Result<(), Error> { + let msg = Message::StartBatch; + self.send_message(&msg).await?; + self.0 = State::Streaming; + + Ok(()) + } + + pub async fn send_no_blocks(&mut self) -> Result<(), Error> { + let msg = Message::NoBlocks; + self.send_message(&msg).await?; + self.0 = State::Idle; + + Ok(()) + } + + pub async fn send_block(&mut self, body: Body) -> Result<(), Error> { + let msg = Message::Block { body }; + self.send_message(&msg).await?; + + Ok(()) + } + + pub async fn send_batch_done(&mut self) -> Result<(), Error> { + let msg = Message::BatchDone; + self.send_message(&msg).await?; + self.0 = State::Idle; + + Ok(()) + } + + /// Receive a message from the client while the miniprotocol is in the + /// `Idle` state. + /// + /// If the message is a `RequestRange`, return the requested range and + /// progess the server state to `Busy`. If the message is a `ClientDone`, + /// return None and progress the server state to `Done`. For any other + /// incoming message type return an `Error`. + pub async fn recv_while_idle(&mut self) -> Result, Error> { + match self.recv_message().await? { + Message::RequestRange { range } => { + self.0 = State::Busy; + + Ok(Some(BlockRequest(range))) + } + Message::ClientDone => { + self.0 = State::Done; + + Ok(None) + } + _ => Err(Error::InvalidInbound), + } + } + + /// Return a range of blocks to the client, starting in the `Busy` state and + /// progressing the state machine as required to send all the blocks to the + /// client. + /// + /// # Arguments + /// + /// * `blocks` - Ordered list of block bodies corresponding to the client's + /// requested range. + pub async fn send_block_range(&mut self, blocks: Vec) -> Result<(), Error> { + if blocks.is_empty() { + self.send_no_blocks().await + } else { + self.send_start_batch().await?; + + for block in blocks { + self.send_block(block).await?; + } + + self.send_batch_done().await + } + } +} diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 8f6d922..2c52658 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -1,9 +1,17 @@ +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::time::Duration; + use pallas_network::facades::PeerClient; +use pallas_network::miniprotocols::blockfetch::BlockRequest; +use pallas_network::miniprotocols::handshake; +use pallas_network::miniprotocols::handshake::n2n::VersionData; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, Point, }; +use pallas_network::multiplexer::{Bearer, Plexer}; +use tokio::net::TcpListener; #[tokio::test] #[ignore] @@ -143,4 +151,114 @@ pub async fn blockfetch_happy_path() { assert!(matches!(client.state(), blockfetch::State::Done)); } +#[tokio::test] +#[ignore] +pub async fn blockfetch_server_and_client_happy_path() { + let block_bodies = vec![ + hex::decode("deadbeefdeadbeef").unwrap(), + hex::decode("c0ffeec0ffeec0ffee").unwrap(), + ]; + + let point = Point::Specific( + 1337, + hex::decode("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(), + ); + + let server = tokio::spawn({ + let bodies = block_bodies.clone(); + let point = point.clone(); + async move { + // server setup + + let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)) + .await + .unwrap(); + + let (bearer, _) = Bearer::accept_tcp(server_listener).await.unwrap(); + + let mut server_plexer = Plexer::new(bearer); + + let mut server_hs: handshake::Server = + handshake::Server::new(server_plexer.subscribe_server(0)); + let mut server_bf = blockfetch::Server::new(server_plexer.subscribe_server(3)); + + tokio::spawn(async move { server_plexer.run().await }); + + server_hs.receive_proposed_versions().await.unwrap(); + server_hs + .accept_version(10, VersionData::new(0, false)) + .await + .unwrap(); + + // server receives range from client, sends blocks + + let BlockRequest(range_request) = server_bf.recv_while_idle().await.unwrap().unwrap(); + + assert_eq!(range_request, (point.clone(), point.clone())); + assert_eq!(*server_bf.state(), blockfetch::State::Busy); + + server_bf.send_block_range(bodies).await.unwrap(); + + assert_eq!(*server_bf.state(), blockfetch::State::Idle); + + // server receives range from client, sends NoBlocks + + let BlockRequest(_) = server_bf.recv_while_idle().await.unwrap().unwrap(); + + server_bf.send_block_range(vec![]).await.unwrap(); + + assert_eq!(*server_bf.state(), blockfetch::State::Idle); + + assert!(server_bf.recv_while_idle().await.unwrap().is_none()); + + assert_eq!(*server_bf.state(), blockfetch::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + + let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap(); + + let client_bf = client_to_server_conn.blockfetch(); + + // client sends request range + + client_bf + .send_request_range((point.clone(), point.clone())) + .await + .unwrap(); + + assert!(client_bf.recv_while_busy().await.unwrap().is_some()); + + // client receives blocks until idle + + let mut received_bodies = Vec::new(); + + while let Some(received_body) = client_bf.recv_while_streaming().await.unwrap() { + received_bodies.push(received_body) + } + + assert_eq!(received_bodies, block_bodies); + + // client sends request range + + client_bf + .send_request_range((point.clone(), point.clone())) + .await + .unwrap(); + + // recv_while_busy returns None for NoBlocks message + assert!(client_bf.recv_while_busy().await.unwrap().is_none()); + + // client sends done + + client_bf.send_done().await.unwrap(); + }); + + _ = tokio::join!(client, server); +} + // TODO: redo txsubmission client test