feat(network): add server side of blockfetch miniprotocol (#275)

This commit is contained in:
Harper 2023-08-09 10:46:45 +01:00 committed by GitHub
parent e117a2723a
commit 9ea5d7cefe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 310 additions and 0 deletions

View file

@ -3,7 +3,9 @@
mod client;
mod codec;
mod protocol;
mod server;
pub use client::*;
pub use codec::*;
pub use protocol::*;
pub use server::*;

View file

@ -0,0 +1,190 @@
use thiserror::Error;
use crate::multiplexer;
use super::{Body, Message, Range, State};
#[derive(Error, Debug)]
pub enum Error {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,
#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,
#[error("inbound message is not valid for current state")]
InvalidInbound,
#[error("outbound message is not valid for current state")]
InvalidOutbound,
#[error("error while sending or receiving data through the multiplexer")]
Plexer(multiplexer::Error),
}
#[derive(Debug)]
pub struct BlockRequest(pub Range);
/// Represents the server for the BlockFetch mini-protocol.
pub struct Server(State, multiplexer::ChannelBuffer);
impl Server {
/// Create a new BlockFetch server from a multiplexer agent channel.
///
/// # Arguments
///
/// * `channel` - A multiplexer agent channel used for communication with
/// the server.
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}
/// Get the current state of the server.
///
/// Returns the current state of the server.
pub fn state(&self) -> &State {
&self.0
}
/// Check if the server is done.
///
/// Returns true if server is in the `Done` state, false otherwise.
pub fn is_done(&self) -> bool {
self.0 == State::Done
}
fn has_agency(&self) -> bool {
match self.state() {
State::Idle => false,
State::Busy => true,
State::Streaming => true,
State::Done => false,
}
}
fn assert_agency_is_ours(&self) -> Result<(), Error> {
if !self.has_agency() {
Err(Error::AgencyIsTheirs)
} else {
Ok(())
}
}
fn assert_agency_is_theirs(&self) -> Result<(), Error> {
if self.has_agency() {
Err(Error::AgencyIsOurs)
} else {
Ok(())
}
}
fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Busy, Message::NoBlocks) => Ok(()),
(State::Busy, Message::StartBatch) => Ok(()),
(State::Streaming, Message::Block { .. }) => Ok(()),
(State::Streaming, Message::BatchDone) => Ok(()),
_ => Err(Error::InvalidOutbound),
}
}
fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::RequestRange { .. }) => Ok(()),
(State::Idle, Message::ClientDone) => Ok(()),
_ => Err(Error::InvalidInbound),
}
}
pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
}
pub async fn send_start_batch(&mut self) -> Result<(), Error> {
let msg = Message::StartBatch;
self.send_message(&msg).await?;
self.0 = State::Streaming;
Ok(())
}
pub async fn send_no_blocks(&mut self) -> Result<(), Error> {
let msg = Message::NoBlocks;
self.send_message(&msg).await?;
self.0 = State::Idle;
Ok(())
}
pub async fn send_block(&mut self, body: Body) -> Result<(), Error> {
let msg = Message::Block { body };
self.send_message(&msg).await?;
Ok(())
}
pub async fn send_batch_done(&mut self) -> Result<(), Error> {
let msg = Message::BatchDone;
self.send_message(&msg).await?;
self.0 = State::Idle;
Ok(())
}
/// Receive a message from the client while the miniprotocol is in the
/// `Idle` state.
///
/// If the message is a `RequestRange`, return the requested range and
/// progess the server state to `Busy`. If the message is a `ClientDone`,
/// return None and progress the server state to `Done`. For any other
/// incoming message type return an `Error`.
pub async fn recv_while_idle(&mut self) -> Result<Option<BlockRequest>, Error> {
match self.recv_message().await? {
Message::RequestRange { range } => {
self.0 = State::Busy;
Ok(Some(BlockRequest(range)))
}
Message::ClientDone => {
self.0 = State::Done;
Ok(None)
}
_ => Err(Error::InvalidInbound),
}
}
/// Return a range of blocks to the client, starting in the `Busy` state and
/// progressing the state machine as required to send all the blocks to the
/// client.
///
/// # Arguments
///
/// * `blocks` - Ordered list of block bodies corresponding to the client's
/// requested range.
pub async fn send_block_range(&mut self, blocks: Vec<Body>) -> Result<(), Error> {
if blocks.is_empty() {
self.send_no_blocks().await
} else {
self.send_start_batch().await?;
for block in blocks {
self.send_block(block).await?;
}
self.send_batch_done().await
}
}
}

View file

@ -1,9 +1,17 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;
use pallas_network::facades::PeerClient;
use pallas_network::miniprotocols::blockfetch::BlockRequest;
use pallas_network::miniprotocols::handshake;
use pallas_network::miniprotocols::handshake::n2n::VersionData;
use pallas_network::miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
Point,
};
use pallas_network::multiplexer::{Bearer, Plexer};
use tokio::net::TcpListener;
#[tokio::test]
#[ignore]
@ -143,4 +151,114 @@ pub async fn blockfetch_happy_path() {
assert!(matches!(client.state(), blockfetch::State::Done));
}
#[tokio::test]
#[ignore]
pub async fn blockfetch_server_and_client_happy_path() {
let block_bodies = vec![
hex::decode("deadbeefdeadbeef").unwrap(),
hex::decode("c0ffeec0ffeec0ffee").unwrap(),
];
let point = Point::Specific(
1337,
hex::decode("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
);
let server = tokio::spawn({
let bodies = block_bodies.clone();
let point = point.clone();
async move {
// server setup
let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001))
.await
.unwrap();
let (bearer, _) = Bearer::accept_tcp(server_listener).await.unwrap();
let mut server_plexer = Plexer::new(bearer);
let mut server_hs: handshake::Server<VersionData> =
handshake::Server::new(server_plexer.subscribe_server(0));
let mut server_bf = blockfetch::Server::new(server_plexer.subscribe_server(3));
tokio::spawn(async move { server_plexer.run().await });
server_hs.receive_proposed_versions().await.unwrap();
server_hs
.accept_version(10, VersionData::new(0, false))
.await
.unwrap();
// server receives range from client, sends blocks
let BlockRequest(range_request) = server_bf.recv_while_idle().await.unwrap().unwrap();
assert_eq!(range_request, (point.clone(), point.clone()));
assert_eq!(*server_bf.state(), blockfetch::State::Busy);
server_bf.send_block_range(bodies).await.unwrap();
assert_eq!(*server_bf.state(), blockfetch::State::Idle);
// server receives range from client, sends NoBlocks
let BlockRequest(_) = server_bf.recv_while_idle().await.unwrap().unwrap();
server_bf.send_block_range(vec![]).await.unwrap();
assert_eq!(*server_bf.state(), blockfetch::State::Idle);
assert!(server_bf.recv_while_idle().await.unwrap().is_none());
assert_eq!(*server_bf.state(), blockfetch::State::Done);
}
});
let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
// client setup
let mut client_to_server_conn = PeerClient::connect("localhost:30001", 0).await.unwrap();
let client_bf = client_to_server_conn.blockfetch();
// client sends request range
client_bf
.send_request_range((point.clone(), point.clone()))
.await
.unwrap();
assert!(client_bf.recv_while_busy().await.unwrap().is_some());
// client receives blocks until idle
let mut received_bodies = Vec::new();
while let Some(received_body) = client_bf.recv_while_streaming().await.unwrap() {
received_bodies.push(received_body)
}
assert_eq!(received_bodies, block_bodies);
// client sends request range
client_bf
.send_request_range((point.clone(), point.clone()))
.await
.unwrap();
// recv_while_busy returns None for NoBlocks message
assert!(client_bf.recv_while_busy().await.unwrap().is_none());
// client sends done
client_bf.send_done().await.unwrap();
});
_ = tokio::join!(client, server);
}
// TODO: redo txsubmission client test