refactor: Improve network module naming (#245)

This commit is contained in:
Santiago Carmuega 2023-04-11 01:21:11 +02:00 committed by GitHub
parent cb0348b47a
commit e46b152786
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 251 additions and 261 deletions

View file

@ -1,201 +0,0 @@
use std::net::SocketAddr;
use std::path::Path;
use byteorder::{ByteOrder, NetworkEndian};
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixStream};
use tokio::time::Instant;
use tracing::trace;
const HEADER_LEN: usize = 8;
pub type Timestamp = u32;
pub type Payload = Vec<u8>;
pub type Protocol = u16;
#[derive(Debug)]
pub struct Header {
pub protocol: Protocol,
pub timestamp: Timestamp,
pub payload_len: u16,
}
impl From<&[u8]> for Header {
fn from(value: &[u8]) -> Self {
let timestamp = NetworkEndian::read_u32(&value[0..4]);
let protocol = NetworkEndian::read_u16(&value[4..6]);
let payload_len = NetworkEndian::read_u16(&value[6..8]);
Self {
timestamp,
protocol,
payload_len,
}
}
}
impl From<Header> for [u8; 8] {
fn from(value: Header) -> Self {
let mut out = [0u8; 8];
NetworkEndian::write_u32(&mut out[0..4], value.timestamp);
NetworkEndian::write_u16(&mut out[4..6], value.protocol);
NetworkEndian::write_u16(&mut out[6..8], value.payload_len);
out
}
}
pub struct Segment {
pub header: Header,
pub payload: Payload,
}
pub enum Bearer {
Tcp(TcpStream),
Unix(UnixStream),
}
const BUFFER_LEN: usize = 1024 * 10;
impl Bearer {
pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result<Self, tokio::io::Error> {
let stream = TcpStream::connect(addr).await?;
Ok(Self::Tcp(stream))
}
pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> {
let (stream, addr) = listener.accept().await?;
Ok((Self::Tcp(stream), addr))
}
pub async fn connect_unix(path: impl AsRef<Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::Unix(stream))
}
pub async fn readable(&self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.readable().await,
Bearer::Unix(x) => x.readable().await,
}
}
fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result<usize> {
match self {
Bearer::Tcp(x) => x.try_read(buf),
Bearer::Unix(x) => x.try_read(buf),
}
}
async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.write_all(buf).await,
Bearer::Unix(x) => x.write_all(buf).await,
}
}
async fn flush(&mut self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.flush().await,
Bearer::Unix(x) => x.flush().await,
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("no data available in bearer to complete segment")]
NoData,
#[error("unexpected I/O error")]
Io(#[source] tokio::io::Error),
}
pub struct SegmentBuffer(Bearer, Vec<u8>);
impl SegmentBuffer {
pub fn new(bearer: Bearer) -> Self {
Self(bearer, Vec::with_capacity(BUFFER_LEN))
}
/// Cancel-safe loop that reads from bearer until certain len
async fn cancellable_read(&mut self, required: usize) -> Result<(), Error> {
loop {
self.0.readable().await.map_err(Error::Io)?;
trace!("bearer is readable");
let remaining = required - self.1.len();
let mut buf = vec![0u8; remaining];
match self.0.try_read(&mut buf) {
Ok(0) => break Err(Error::NoData),
Ok(n) => {
trace!(n, "found data on bearer");
self.1.extend_from_slice(&buf[0..n]);
if self.1.len() >= required {
break Ok(());
}
}
Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
trace!("reading from bearer would block");
continue;
}
Err(e) => {
return Err(Error::Io(e));
}
}
}
}
/// Peek the available data in search for a frame header
async fn peek_header(&mut self) -> Result<Header, Error> {
trace!("waiting for header buf");
self.cancellable_read(HEADER_LEN).await?;
trace!("found enough data for header");
let header = &self.1[..HEADER_LEN];
Ok(Header::from(header))
}
// Cancel-safe read of a full segment from the bearer
pub async fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> {
let header = self.peek_header().await?;
trace!("waiting for full segment buf");
let segment_size = HEADER_LEN + header.payload_len as usize;
self.cancellable_read(segment_size).await?;
trace!("draining segment buffer");
let segment = self.1.drain(..segment_size);
let payload = segment.skip(HEADER_LEN).collect();
Ok((header.protocol, payload))
}
pub async fn write_segment(
&mut self,
protocol: u16,
clock: &Instant,
payload: &[u8],
) -> Result<(), std::io::Error> {
let header = Header {
protocol,
timestamp: clock.elapsed().as_micros() as u32,
payload_len: payload.len() as u16,
};
let buf: [u8; 8] = header.into();
self.0.write_all(&buf).await?;
self.0.write_all(payload).await?;
self.0.flush().await?;
Ok(())
}
}

View file

@ -5,12 +5,11 @@ use tokio::task::JoinHandle;
use tracing::{debug, error};
use crate::{
bearer,
miniprotocols::{
blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY,
},
plexer,
multiplexer::{self, Bearer},
};
#[derive(Debug, Error)]
@ -35,11 +34,11 @@ pub struct PeerClient {
impl PeerClient {
pub async fn connect(address: &str, magic: u64) -> Result<Self, Error> {
debug!("connecting");
let bearer = bearer::Bearer::connect_tcp(address)
let bearer = Bearer::connect_tcp(address)
.await
.map_err(Error::ConnectFailure)?;
let mut plexer = plexer::Plexer::new(bearer);
let mut plexer = multiplexer::Plexer::new(bearer);
let channel0 = plexer.subscribe_client(0);
let channel2 = plexer.subscribe_client(2);
@ -92,11 +91,11 @@ impl NodeClient {
pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
debug!("connecting");
let bearer = bearer::Bearer::connect_unix(path)
let bearer = Bearer::connect_unix(path)
.await
.map_err(Error::ConnectFailure)?;
let mut plexer = plexer::Plexer::new(bearer);
let mut plexer = multiplexer::Plexer::new(bearer);
let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC);

View file

@ -1,4 +1,3 @@
pub mod bearer;
pub mod facades;
pub mod miniprotocols;
pub mod plexer;
pub mod multiplexer;

View file

@ -2,7 +2,7 @@ use thiserror::Error;
use tracing::{debug, info, warn};
use crate::miniprotocols::common::Point;
use crate::plexer;
use crate::multiplexer;
use super::{Message, State};
@ -24,7 +24,7 @@ pub enum Error {
NoBlocks,
#[error("error while sending or receiving data through the multiplexer")]
Plexer(plexer::Error),
Plexer(multiplexer::Error),
}
pub type Body = Vec<u8>;
@ -33,11 +33,11 @@ pub type Range = (Point, Point);
pub type HasBlocks = Option<()>;
pub struct Client(State, plexer::ChannelBuffer);
pub struct Client(State, multiplexer::ChannelBuffer);
impl Client {
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(State::Idle, plexer::ChannelBuffer::new(channel))
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {

View file

@ -4,7 +4,7 @@ use thiserror::Error;
use tracing::debug;
use crate::miniprotocols::Point;
use crate::plexer;
use crate::multiplexer;
use super::{BlockContent, HeaderContent, Message, State, Tip};
@ -26,7 +26,7 @@ pub enum Error {
IntersectionNotFound,
#[error("error while sending or receiving data through the channel")]
Plexer(plexer::Error),
Plexer(multiplexer::Error),
}
pub type IntersectResponse = (Option<Point>, Tip);
@ -38,7 +38,7 @@ pub enum NextResponse<CONTENT> {
Await,
}
pub struct Client<O>(State, plexer::ChannelBuffer, PhantomData<O>)
pub struct Client<O>(State, multiplexer::ChannelBuffer, PhantomData<O>)
where
Message<O>: Fragment;
@ -46,10 +46,10 @@ impl<O> Client<O>
where
Message<O>: Fragment,
{
pub fn new(channel: plexer::AgentChannel) -> Self {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Idle,
plexer::ChannelBuffer::new(channel),
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
}

View file

@ -3,7 +3,7 @@ use std::marker::PhantomData;
use tracing::debug;
use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable};
use crate::plexer;
use crate::multiplexer;
#[derive(Debug)]
pub enum Confirmation<D> {
@ -11,17 +11,17 @@ pub enum Confirmation<D> {
Rejected(RefuseReason),
}
pub struct Client<D>(State, plexer::ChannelBuffer, PhantomData<D>);
pub struct Client<D>(State, multiplexer::ChannelBuffer, PhantomData<D>);
impl<D> Client<D>
where
D: std::fmt::Debug + Clone,
Message<D>: Fragment,
{
pub fn new(channel: plexer::AgentChannel) -> Self {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Propose,
plexer::ChannelBuffer::new(channel),
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
}
@ -122,7 +122,7 @@ where
self.recv_while_confirm().await
}
pub fn unwrap(self) -> plexer::AgentChannel {
pub fn unwrap(self) -> multiplexer::AgentChannel {
self.1.unwrap()
}
}

View file

@ -3,7 +3,7 @@ use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use std::{collections::HashMap, fmt::Debug};
use thiserror::*;
use crate::plexer;
use crate::multiplexer;
#[derive(Error, Debug)]
pub enum Error {
@ -20,7 +20,7 @@ pub enum Error {
InvalidOutbound,
#[error("error while sending or receiving data through the channel")]
Plexer(plexer::Error),
Plexer(multiplexer::Error),
}
#[derive(Debug, Clone)]

View file

@ -3,19 +3,19 @@ use std::marker::PhantomData;
use pallas_codec::Fragment;
use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable};
use crate::plexer;
use crate::multiplexer;
pub struct Server<D>(State, plexer::ChannelBuffer, PhantomData<D>);
pub struct Server<D>(State, multiplexer::ChannelBuffer, PhantomData<D>);
impl<D> Server<D>
where
D: std::fmt::Debug + Clone,
Message<D>: Fragment,
{
pub fn new(channel: plexer::AgentChannel) -> Self {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Propose,
plexer::ChannelBuffer::new(channel),
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
}
@ -109,7 +109,7 @@ where
Ok(())
}
pub fn unwrap(self) -> plexer::AgentChannel {
pub fn unwrap(self) -> multiplexer::AgentChannel {
self.1.unwrap()
}
}

View file

@ -7,7 +7,7 @@ use thiserror::*;
use super::{AcquireFailure, Message, Query, State};
use crate::miniprotocols::Point;
use crate::plexer;
use crate::multiplexer;
#[derive(Error, Debug)]
pub enum Error {
@ -24,7 +24,7 @@ pub enum Error {
#[error("failure acquiring point, too old")]
AcquirePointTooOld,
#[error("error while sending or receiving data through the channel")]
Plexer(plexer::Error),
Plexer(multiplexer::Error),
}
impl From<AcquireFailure> for Error {
@ -36,7 +36,7 @@ impl From<AcquireFailure> for Error {
}
}
pub struct Client<Q>(State, plexer::ChannelBuffer, PhantomData<Q>)
pub struct Client<Q>(State, multiplexer::ChannelBuffer, PhantomData<Q>)
where
Q: Query,
Message<Q>: Fragment;
@ -46,10 +46,10 @@ where
Q: Query,
Message<Q>: Fragment,
{
pub fn new(channel: plexer::AgentChannel) -> Self {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Idle,
plexer::ChannelBuffer::new(channel),
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
}

View file

@ -2,7 +2,7 @@ use std::fmt::Debug;
use thiserror::*;
use super::protocol::*;
use crate::plexer;
use crate::multiplexer;
#[derive(Error, Debug)]
pub enum Error {
@ -19,14 +19,14 @@ pub enum Error {
InvalidOutbound,
#[error("error while sending or receiving data through the channel")]
Plexer(plexer::Error),
Plexer(multiplexer::Error),
}
pub struct Client(State, plexer::ChannelBuffer);
pub struct Client(State, multiplexer::ChannelBuffer);
impl Client {
pub fn new(channel: plexer::AgentChannel) -> Self {
Self(State::Idle, plexer::ChannelBuffer::new(channel))
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {

View file

@ -1,6 +1,6 @@
use std::marker::PhantomData;
use crate::plexer;
use crate::multiplexer;
use pallas_codec::Fragment;
use super::{
@ -18,7 +18,7 @@ pub enum Request<TxId> {
/// to another server
pub struct GenericClient<TxId, TxBody>(
State,
plexer::ChannelBuffer,
multiplexer::ChannelBuffer,
PhantomData<TxId>,
PhantomData<TxBody>,
)
@ -32,10 +32,10 @@ impl<TxId, TxBody> GenericClient<TxId, TxBody>
where
Message<TxId, TxBody>: Fragment,
{
pub fn new(channel: plexer::AgentChannel) -> Self {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Init,
plexer::ChannelBuffer::new(channel),
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
PhantomData {},
)

View file

@ -1,6 +1,6 @@
use thiserror::Error;
use crate::plexer;
use crate::multiplexer;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
@ -47,7 +47,7 @@ pub enum Error {
AlreadyInitialized,
#[error("error while sending or receiving data through the channel")]
Plexer(plexer::Error),
Plexer(multiplexer::Error),
}
#[derive(Debug)]

View file

@ -6,7 +6,7 @@ use super::{
protocol::{Blocking, Error, Message, State, TxCount, TxIdAndSize},
EraTxBody, EraTxId,
};
use crate::plexer;
use crate::multiplexer;
pub enum Reply<TxId, TxBody> {
TxIds(Vec<TxIdAndSize<TxId>>),
@ -18,7 +18,7 @@ pub enum Reply<TxId, TxBody> {
/// and receive transactions from a client
pub struct GenericServer<TxId, TxBody>(
State,
plexer::ChannelBuffer,
multiplexer::ChannelBuffer,
PhantomData<TxId>,
PhantomData<TxBody>,
)
@ -32,10 +32,10 @@ impl<TxId, TxBody> GenericServer<TxId, TxBody>
where
Message<TxId, TxBody>: Fragment,
{
pub fn new(channel: plexer::AgentChannel) -> Self {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Init,
plexer::ChannelBuffer::new(channel),
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
PhantomData {},
)

View file

@ -1,13 +1,120 @@
use byteorder::{ByteOrder, NetworkEndian};
use pallas_codec::{minicbor, Fragment};
use std::net::SocketAddr;
use std::path::Path;
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixStream};
use tokio::select;
use tokio::sync::mpsc::error::SendError;
use tokio::{select, time::Instant};
use tokio::time::Instant;
use tracing::{debug, error, trace};
use crate::bearer::{Bearer, Payload, Protocol, SegmentBuffer};
const HEADER_LEN: usize = 8;
#[derive(Error, Debug)]
pub type Timestamp = u32;
pub type Payload = Vec<u8>;
pub type Protocol = u16;
#[derive(Debug)]
pub struct Header {
pub protocol: Protocol,
pub timestamp: Timestamp,
pub payload_len: u16,
}
impl From<&[u8]> for Header {
fn from(value: &[u8]) -> Self {
let timestamp = NetworkEndian::read_u32(&value[0..4]);
let protocol = NetworkEndian::read_u16(&value[4..6]);
let payload_len = NetworkEndian::read_u16(&value[6..8]);
Self {
timestamp,
protocol,
payload_len,
}
}
}
impl From<Header> for [u8; 8] {
fn from(value: Header) -> Self {
let mut out = [0u8; 8];
NetworkEndian::write_u32(&mut out[0..4], value.timestamp);
NetworkEndian::write_u16(&mut out[4..6], value.protocol);
NetworkEndian::write_u16(&mut out[6..8], value.payload_len);
out
}
}
pub struct Segment {
pub header: Header,
pub payload: Payload,
}
pub enum Bearer {
Tcp(TcpStream),
Unix(UnixStream),
}
const BUFFER_LEN: usize = 1024 * 10;
impl Bearer {
pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result<Self, tokio::io::Error> {
let stream = TcpStream::connect(addr).await?;
Ok(Self::Tcp(stream))
}
pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> {
let (stream, addr) = listener.accept().await?;
Ok((Self::Tcp(stream), addr))
}
pub async fn connect_unix(path: impl AsRef<Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::Unix(stream))
}
pub async fn readable(&self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.readable().await,
Bearer::Unix(x) => x.readable().await,
}
}
fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result<usize> {
match self {
Bearer::Tcp(x) => x.try_read(buf),
Bearer::Unix(x) => x.try_read(buf),
}
}
async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.write_all(buf).await,
Bearer::Unix(x) => x.write_all(buf).await,
}
}
async fn flush(&mut self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.flush().await,
Bearer::Unix(x) => x.flush().await,
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("no data available in bearer to complete segment")]
EmptyBearer,
#[error("bearer I/O error")]
BearerIo(tokio::io::Error),
#[error("failure to encode channel message")]
Decoding(String),
@ -25,20 +132,103 @@ pub enum Error {
#[error("plexer failed to mux chunk")]
PlexerMux,
}
#[error("bearer IO error")]
Bearer(tokio::io::Error),
pub struct SegmentBuffer(Bearer, Vec<u8>);
impl SegmentBuffer {
pub fn new(bearer: Bearer) -> Self {
Self(bearer, Vec::with_capacity(BUFFER_LEN))
}
/// Cancel-safe loop that reads from bearer until certain len
async fn cancellable_read(&mut self, required: usize) -> Result<(), Error> {
loop {
self.0.readable().await.map_err(Error::BearerIo)?;
trace!("bearer is readable");
let remaining = required - self.1.len();
let mut buf = vec![0u8; remaining];
match self.0.try_read(&mut buf) {
Ok(0) => break Err(Error::EmptyBearer),
Ok(n) => {
trace!(n, "found data on bearer");
self.1.extend_from_slice(&buf[0..n]);
if self.1.len() >= required {
break Ok(());
}
}
Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
trace!("reading from bearer would block");
continue;
}
Err(e) => {
return Err(Error::BearerIo(e));
}
}
}
}
/// Peek the available data in search for a frame header
async fn peek_header(&mut self) -> Result<Header, Error> {
trace!("waiting for header buf");
self.cancellable_read(HEADER_LEN).await?;
trace!("found enough data for header");
let header = &self.1[..HEADER_LEN];
Ok(Header::from(header))
}
// Cancel-safe read of a full segment from the bearer
pub async fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> {
let header = self.peek_header().await?;
trace!("waiting for full segment buf");
let segment_size = HEADER_LEN + header.payload_len as usize;
self.cancellable_read(segment_size).await?;
trace!("draining segment buffer");
let segment = self.1.drain(..segment_size);
let payload = segment.skip(HEADER_LEN).collect();
Ok((header.protocol, payload))
}
pub async fn write_segment(
&mut self,
protocol: u16,
clock: &Instant,
payload: &[u8],
) -> Result<(), std::io::Error> {
let header = Header {
protocol,
timestamp: clock.elapsed().as_micros() as u32,
payload_len: payload.len() as u16,
};
let buf: [u8; 8] = header.into();
self.0.write_all(&buf).await?;
self.0.write_all(payload).await?;
self.0.flush().await?;
Ok(())
}
}
pub struct AgentChannel {
enqueue_protocol: crate::bearer::Protocol,
dequeue_protocol: crate::bearer::Protocol,
enqueue_protocol: Protocol,
dequeue_protocol: Protocol,
to_plexer: tokio::sync::mpsc::Sender<(Protocol, Payload)>,
from_plexer: tokio::sync::broadcast::Receiver<(Protocol, Payload)>,
}
impl AgentChannel {
fn for_client(protocol: crate::bearer::Protocol, ingress: &Ingress, egress: &Egress) -> Self {
fn for_client(protocol: Protocol, ingress: &Ingress, egress: &Egress) -> Self {
Self {
enqueue_protocol: protocol,
dequeue_protocol: protocol ^ 0x8000,
@ -47,7 +237,7 @@ impl AgentChannel {
}
}
fn for_server(protocol: crate::bearer::Protocol, ingress: &Ingress, egress: &Egress) -> Self {
fn for_server(protocol: Protocol, ingress: &Ingress, egress: &Egress) -> Self {
Self {
enqueue_protocol: protocol ^ 0x8000,
dequeue_protocol: protocol,

View file

@ -1,6 +1,6 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use pallas_network::{bearer::Bearer, plexer::Plexer};
use pallas_network::multiplexer::{Bearer, Plexer};
use rand::{distributions::Uniform, Rng};
use tokio::net::TcpListener;

View file

@ -30,3 +30,6 @@ pub use pallas_crypto as crypto;
#[doc(inline)]
pub use pallas_codec as codec;
#[doc(inline)]
pub use pallas_upstream as upstream;