feat(network): add server-side facades (#282)

This commit is contained in:
Harper 2023-09-01 13:15:14 +01:00 committed by GitHub
parent fdf41b5b32
commit 7f919dcef5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 206 additions and 41 deletions

View file

@ -1,14 +1,17 @@
use std::path::Path;
use thiserror::Error;
use tokio::net::{TcpListener, UnixListener};
use tokio::task::JoinHandle;
use tracing::{debug, error};
use crate::miniprotocols::handshake::Confirmation;
use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber};
use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE;
use crate::{
miniprotocols::{
blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH,
PROTOCOL_N2N_CHAIN_SYNC,
},
multiplexer::{self, Bearer},
};
@ -25,6 +28,7 @@ pub enum Error {
IncompatibleVersion,
}
/// Client of N2N Ouroboros
pub struct PeerClient {
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
@ -81,6 +85,64 @@ impl PeerClient {
}
}
/// Server of N2N Ouroboros
pub struct PeerServer {
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub version: (VersionNumber, n2n::VersionData),
chainsync: chainsync::N2NServer,
blockfetch: blockfetch::Server,
}
impl PeerServer {
pub async fn accept(listener: &TcpListener, magic: u64) -> Result<Self, Error> {
let (bearer, _) = Bearer::accept_tcp(&listener)
.await
.map_err(Error::ConnectFailure)?;
let mut server_plexer = multiplexer::Plexer::new(bearer);
let hs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
let cs_channel = server_plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC);
let bf_channel = server_plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH);
let mut server_hs: handshake::Server<n2n::VersionData> = handshake::Server::new(hs_channel);
let server_cs = chainsync::N2NServer::new(cs_channel);
let server_bf = blockfetch::Server::new(bf_channel);
let plexer_handle = tokio::spawn(async move { server_plexer.run().await });
let accepted_version = server_hs
.handshake(n2n::VersionTable::v7_and_above(magic))
.await
.map_err(Error::HandshakeProtocol)?;
if let Some(ver) = accepted_version {
Ok(Self {
plexer_handle,
version: ver,
chainsync: server_cs,
blockfetch: server_bf,
})
} else {
plexer_handle.abort();
Err(Error::IncompatibleVersion)
}
}
pub fn chainsync(&mut self) -> &mut chainsync::N2NServer {
&mut self.chainsync
}
pub fn blockfetch(&mut self) -> &mut blockfetch::Server {
&mut self.blockfetch
}
pub fn abort(&mut self) {
self.plexer_handle.abort();
}
}
/// Client of N2C Ouroboros
pub struct NodeClient {
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub handshake: handshake::Confirmation<handshake::n2c::VersionData>,
@ -179,3 +241,60 @@ impl NodeClient {
self.plexer_handle.abort();
}
}
/// Server of N2C Ouroboros
pub struct NodeServer {
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub version: (VersionNumber, n2c::VersionData),
chainsync: chainsync::N2CServer,
// statequery: localstate::Server,
}
impl NodeServer {
pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
let (bearer, _) = Bearer::accept_unix(listener)
.await
.map_err(Error::ConnectFailure)?;
let mut server_plexer = multiplexer::Plexer::new(bearer);
let hs_channel = server_plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
let cs_channel = server_plexer.subscribe_server(PROTOCOL_N2C_CHAIN_SYNC);
// let sq_channel = server_plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY);
let mut server_hs: handshake::Server<n2c::VersionData> = handshake::Server::new(hs_channel);
let server_cs = chainsync::N2CServer::new(cs_channel);
// let server_sq = localstate::Server::new(sq_channel);
let plexer_handle = tokio::spawn(async move { server_plexer.run().await });
let accepted_version = server_hs
.handshake(n2c::VersionTable::v10_and_above(magic))
.await
.map_err(Error::HandshakeProtocol)?;
if let Some(ver) = accepted_version {
Ok(Self {
plexer_handle,
version: ver,
chainsync: server_cs,
// statequery: server_sq
})
} else {
plexer_handle.abort();
Err(Error::IncompatibleVersion)
}
}
pub fn chainsync(&mut self) -> &mut chainsync::N2CServer {
&mut self.chainsync
}
// pub fn statequery(&mut self) -> &mut localstate::Server {
// &mut self.statequery
// }
pub fn abort(&mut self) {
self.plexer_handle.abort();
}
}

View file

@ -80,7 +80,7 @@ impl VersionTable {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct VersionData(NetworkMagic, Option<bool>);
impl Encode<()> for VersionData {

View file

@ -57,7 +57,7 @@ impl VersionTable {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct VersionData {
network_magic: u64,
initiator_and_responder_diffusion_mode: bool,

View file

@ -1,6 +1,7 @@
use std::marker::PhantomData;
use pallas_codec::Fragment;
use tracing::{debug, warn};
use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable};
use crate::multiplexer;
@ -9,7 +10,7 @@ pub struct Server<D>(State, multiplexer::ChannelBuffer, PhantomData<D>);
impl<D> Server<D>
where
D: std::fmt::Debug + Clone,
D: std::fmt::Debug + Clone + std::cmp::PartialEq,
Message<D>: Fragment,
{
pub fn new(channel: multiplexer::AgentChannel) -> Self {
@ -109,6 +110,70 @@ where
Ok(())
}
/// Perform a handshake with the client
///
/// Performs a full handshake with the client, where `versions` are the
/// acceptable versions supported by the server.
pub async fn handshake(
&mut self,
versions: VersionTable<D>,
) -> Result<Option<(VersionNumber, D)>, Error> {
// receive proposed versions
let client_versions = self
.receive_proposed_versions()
.await?
.values
.into_iter()
.collect::<Vec<(u64, D)>>();
// find highest intersect with our version table (TODO: improve)
let mut versions = versions.values.into_iter().collect::<Vec<(u64, D)>>();
versions.sort_by(|a, b| b.0.cmp(&a.0));
for (ver_num, ver_data) in versions.clone() {
for (client_ver_num, client_ver_data) in client_versions.clone() {
if ver_num == client_ver_num {
if ver_data == client_ver_data {
// found a version number and extra data match
debug!("accepting hs with ({}, {:?})", ver_num, ver_data);
self.accept_version(ver_num, ver_data.clone()).await?;
return Ok(Some((ver_num, ver_data)));
} else {
warn!(
"rejecting hs as params not acceptable - server: {:?}, client: {:?}",
ver_data, client_ver_data
);
// found version number match but extra data not acceptable
self.refuse(RefuseReason::Refused(
ver_num,
"Proposed extra params don't match".into(),
))
.await?;
return Ok(None);
}
}
}
}
warn!(
"rejecting hs as no version intersect found - server: {:?}, client: {:?}",
versions, client_versions
);
// failed to find a version number intersection
self.refuse(RefuseReason::VersionMismatch(
versions.into_iter().map(|(num, _)| num).collect(),
))
.await?;
Ok(None)
}
pub fn unwrap(self) -> multiplexer::AgentChannel {
self.1.unwrap()
}

View file

@ -6,7 +6,7 @@ use std::net::SocketAddr;
use std::path::Path;
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixListener};
use tokio::select;
use tokio::sync::mpsc::error::SendError;
use tokio::time::Instant;
@ -79,11 +79,19 @@ impl Bearer {
Ok(Self::Tcp(stream))
}
pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> {
pub async fn accept_tcp(listener: &TcpListener) -> tokio::io::Result<(Self, SocketAddr)> {
let (stream, addr) = listener.accept().await?;
Ok((Self::Tcp(stream), addr))
}
#[cfg(not(target_os = "windows"))]
pub async fn accept_unix(
listener: &UnixListener,
) -> tokio::io::Result<(Self, tokio::net::unix::SocketAddr)> {
let (stream, addr) = listener.accept().await?;
Ok((Self::Unix(stream), addr))
}
#[cfg(not(target_os = "windows"))]
pub async fn connect_unix(path: impl AsRef<Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;

View file

@ -11,7 +11,7 @@ async fn setup_passive_muxer<const P: u16>() -> Plexer {
println!("listening for connections on port {}", P);
let (bearer, _) = Bearer::accept_tcp(server).await.unwrap();
let (bearer, _) = Bearer::accept_tcp(&server).await.unwrap();
Plexer::new(bearer)
}

View file

@ -1,17 +1,14 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;
use pallas_network::facades::PeerClient;
use pallas_network::facades::{PeerClient, PeerServer};
use pallas_network::miniprotocols::blockfetch::BlockRequest;
use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip};
use pallas_network::miniprotocols::handshake;
use pallas_network::miniprotocols::handshake::n2n::VersionData;
use pallas_network::miniprotocols::{
blockfetch,
chainsync::{self, NextResponse},
Point,
};
use pallas_network::multiplexer::{Bearer, Plexer};
use tokio::net::TcpListener;
#[tokio::test]
@ -175,21 +172,9 @@ pub async fn blockfetch_server_and_client_happy_path() {
.await
.unwrap();
let (bearer, _) = Bearer::accept_tcp(server_listener).await.unwrap();
let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap();
let mut server_plexer = Plexer::new(bearer);
let mut server_hs: handshake::Server<VersionData> =
handshake::Server::new(server_plexer.subscribe_server(0));
let mut server_bf = blockfetch::Server::new(server_plexer.subscribe_server(3));
tokio::spawn(async move { server_plexer.run().await });
server_hs.receive_proposed_versions().await.unwrap();
server_hs
.accept_version(10, VersionData::new(0, false))
.await
.unwrap();
let server_bf = peer_server.blockfetch();
// server receives range from client, sends blocks
@ -278,21 +263,9 @@ pub async fn chainsync_server_and_client_happy_path_n2n() {
.await
.unwrap();
let (bearer, _) = Bearer::accept_tcp(server_listener).await.unwrap();
let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap();
let mut server_plexer = Plexer::new(bearer);
let mut server_hs: handshake::Server<VersionData> =
handshake::Server::new(server_plexer.subscribe_server(0));
let mut server_cs = chainsync::N2NServer::new(server_plexer.subscribe_server(2));
tokio::spawn(async move { server_plexer.run().await });
server_hs.receive_proposed_versions().await.unwrap();
server_hs
.accept_version(10, VersionData::new(0, false))
.await
.unwrap();
let server_cs = peer_server.chainsync();
// server receives find intersect from client, sends intersect point
@ -386,7 +359,7 @@ pub async fn chainsync_server_and_client_happy_path_n2n() {
});
let client = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
// client setup