From e46b152786a720817a5c62cfbce27ccbe2c6d6e0 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 11 Apr 2023 01:21:11 +0200 Subject: [PATCH] refactor: Improve network module naming (#245) --- pallas-network/src/bearer.rs | 201 ----------------- pallas-network/src/facades.rs | 11 +- pallas-network/src/lib.rs | 3 +- .../src/miniprotocols/blockfetch/client.rs | 10 +- .../src/miniprotocols/chainsync/client.rs | 10 +- .../src/miniprotocols/handshake/client.rs | 10 +- .../src/miniprotocols/handshake/protocol.rs | 4 +- .../src/miniprotocols/handshake/server.rs | 10 +- .../src/miniprotocols/localstate/client.rs | 10 +- .../src/miniprotocols/txmonitor/client.rs | 10 +- .../src/miniprotocols/txsubmission/client.rs | 8 +- .../miniprotocols/txsubmission/protocol.rs | 4 +- .../src/miniprotocols/txsubmission/server.rs | 8 +- .../src/{plexer.rs => multiplexer.rs} | 208 +++++++++++++++++- pallas-network/tests/plexer.rs | 2 +- pallas/src/lib.rs | 3 + 16 files changed, 251 insertions(+), 261 deletions(-) delete mode 100644 pallas-network/src/bearer.rs rename pallas-network/src/{plexer.rs => multiplexer.rs} (59%) diff --git a/pallas-network/src/bearer.rs b/pallas-network/src/bearer.rs deleted file mode 100644 index bebf602..0000000 --- a/pallas-network/src/bearer.rs +++ /dev/null @@ -1,201 +0,0 @@ -use std::net::SocketAddr; -use std::path::Path; - -use byteorder::{ByteOrder, NetworkEndian}; -use thiserror::Error; -use tokio::io::AsyncWriteExt; -use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixStream}; -use tokio::time::Instant; -use tracing::trace; - -const HEADER_LEN: usize = 8; - -pub type Timestamp = u32; - -pub type Payload = Vec; - -pub type Protocol = u16; - -#[derive(Debug)] -pub struct Header { - pub protocol: Protocol, - pub timestamp: Timestamp, - pub payload_len: u16, -} - -impl From<&[u8]> for Header { - fn from(value: &[u8]) -> Self { - let timestamp = NetworkEndian::read_u32(&value[0..4]); - let protocol = NetworkEndian::read_u16(&value[4..6]); - let payload_len = NetworkEndian::read_u16(&value[6..8]); - - Self { - timestamp, - protocol, - payload_len, - } - } -} - -impl From
for [u8; 8] { - fn from(value: Header) -> Self { - let mut out = [0u8; 8]; - NetworkEndian::write_u32(&mut out[0..4], value.timestamp); - NetworkEndian::write_u16(&mut out[4..6], value.protocol); - NetworkEndian::write_u16(&mut out[6..8], value.payload_len); - - out - } -} - -pub struct Segment { - pub header: Header, - pub payload: Payload, -} - -pub enum Bearer { - Tcp(TcpStream), - Unix(UnixStream), -} - -const BUFFER_LEN: usize = 1024 * 10; - -impl Bearer { - pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result { - let stream = TcpStream::connect(addr).await?; - Ok(Self::Tcp(stream)) - } - - pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { - let (stream, addr) = listener.accept().await?; - Ok((Self::Tcp(stream), addr)) - } - - pub async fn connect_unix(path: impl AsRef) -> Result { - let stream = UnixStream::connect(path).await?; - Ok(Self::Unix(stream)) - } - - pub async fn readable(&self) -> tokio::io::Result<()> { - match self { - Bearer::Tcp(x) => x.readable().await, - Bearer::Unix(x) => x.readable().await, - } - } - - fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { - match self { - Bearer::Tcp(x) => x.try_read(buf), - Bearer::Unix(x) => x.try_read(buf), - } - } - - async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> { - match self { - Bearer::Tcp(x) => x.write_all(buf).await, - Bearer::Unix(x) => x.write_all(buf).await, - } - } - - async fn flush(&mut self) -> tokio::io::Result<()> { - match self { - Bearer::Tcp(x) => x.flush().await, - Bearer::Unix(x) => x.flush().await, - } - } -} - -#[derive(Debug, Error)] -pub enum Error { - #[error("no data available in bearer to complete segment")] - NoData, - - #[error("unexpected I/O error")] - Io(#[source] tokio::io::Error), -} - -pub struct SegmentBuffer(Bearer, Vec); - -impl SegmentBuffer { - pub fn new(bearer: Bearer) -> Self { - Self(bearer, Vec::with_capacity(BUFFER_LEN)) - } - - /// Cancel-safe loop that reads from bearer until certain len - async fn cancellable_read(&mut self, required: usize) -> Result<(), Error> { - loop { - self.0.readable().await.map_err(Error::Io)?; - trace!("bearer is readable"); - - let remaining = required - self.1.len(); - let mut buf = vec![0u8; remaining]; - - match self.0.try_read(&mut buf) { - Ok(0) => break Err(Error::NoData), - Ok(n) => { - trace!(n, "found data on bearer"); - self.1.extend_from_slice(&buf[0..n]); - - if self.1.len() >= required { - break Ok(()); - } - } - Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => { - trace!("reading from bearer would block"); - continue; - } - Err(e) => { - return Err(Error::Io(e)); - } - } - } - } - - /// Peek the available data in search for a frame header - async fn peek_header(&mut self) -> Result { - trace!("waiting for header buf"); - self.cancellable_read(HEADER_LEN).await?; - - trace!("found enough data for header"); - let header = &self.1[..HEADER_LEN]; - - Ok(Header::from(header)) - } - - // Cancel-safe read of a full segment from the bearer - pub async fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> { - let header = self.peek_header().await?; - - trace!("waiting for full segment buf"); - let segment_size = HEADER_LEN + header.payload_len as usize; - - self.cancellable_read(segment_size).await?; - - trace!("draining segment buffer"); - let segment = self.1.drain(..segment_size); - let payload = segment.skip(HEADER_LEN).collect(); - - Ok((header.protocol, payload)) - } - - pub async fn write_segment( - &mut self, - protocol: u16, - clock: &Instant, - payload: &[u8], - ) -> Result<(), std::io::Error> { - let header = Header { - protocol, - timestamp: clock.elapsed().as_micros() as u32, - payload_len: payload.len() as u16, - }; - - let buf: [u8; 8] = header.into(); - self.0.write_all(&buf).await?; - self.0.write_all(payload).await?; - - self.0.flush().await?; - - Ok(()) - } -} diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 5bbba2b..c2b1984 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -5,12 +5,11 @@ use tokio::task::JoinHandle; use tracing::{debug, error}; use crate::{ - bearer, miniprotocols::{ blockfetch, chainsync, handshake, localstate, PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, }, - plexer, + multiplexer::{self, Bearer}, }; #[derive(Debug, Error)] @@ -35,11 +34,11 @@ pub struct PeerClient { impl PeerClient { pub async fn connect(address: &str, magic: u64) -> Result { debug!("connecting"); - let bearer = bearer::Bearer::connect_tcp(address) + let bearer = Bearer::connect_tcp(address) .await .map_err(Error::ConnectFailure)?; - let mut plexer = plexer::Plexer::new(bearer); + let mut plexer = multiplexer::Plexer::new(bearer); let channel0 = plexer.subscribe_client(0); let channel2 = plexer.subscribe_client(2); @@ -92,11 +91,11 @@ impl NodeClient { pub async fn connect(path: impl AsRef, magic: u64) -> Result { debug!("connecting"); - let bearer = bearer::Bearer::connect_unix(path) + let bearer = Bearer::connect_unix(path) .await .map_err(Error::ConnectFailure)?; - let mut plexer = plexer::Plexer::new(bearer); + let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE); let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC); diff --git a/pallas-network/src/lib.rs b/pallas-network/src/lib.rs index 750af8f..3647f9f 100644 --- a/pallas-network/src/lib.rs +++ b/pallas-network/src/lib.rs @@ -1,4 +1,3 @@ -pub mod bearer; pub mod facades; pub mod miniprotocols; -pub mod plexer; +pub mod multiplexer; diff --git a/pallas-network/src/miniprotocols/blockfetch/client.rs b/pallas-network/src/miniprotocols/blockfetch/client.rs index 09a4388..66b1a79 100644 --- a/pallas-network/src/miniprotocols/blockfetch/client.rs +++ b/pallas-network/src/miniprotocols/blockfetch/client.rs @@ -2,7 +2,7 @@ use thiserror::Error; use tracing::{debug, info, warn}; use crate::miniprotocols::common::Point; -use crate::plexer; +use crate::multiplexer; use super::{Message, State}; @@ -24,7 +24,7 @@ pub enum Error { NoBlocks, #[error("error while sending or receiving data through the multiplexer")] - Plexer(plexer::Error), + Plexer(multiplexer::Error), } pub type Body = Vec; @@ -33,11 +33,11 @@ pub type Range = (Point, Point); pub type HasBlocks = Option<()>; -pub struct Client(State, plexer::ChannelBuffer); +pub struct Client(State, multiplexer::ChannelBuffer); impl Client { - pub fn new(channel: plexer::AgentChannel) -> Self { - Self(State::Idle, plexer::ChannelBuffer::new(channel)) + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) } pub fn state(&self) -> &State { diff --git a/pallas-network/src/miniprotocols/chainsync/client.rs b/pallas-network/src/miniprotocols/chainsync/client.rs index 6b985d9..b521db6 100644 --- a/pallas-network/src/miniprotocols/chainsync/client.rs +++ b/pallas-network/src/miniprotocols/chainsync/client.rs @@ -4,7 +4,7 @@ use thiserror::Error; use tracing::debug; use crate::miniprotocols::Point; -use crate::plexer; +use crate::multiplexer; use super::{BlockContent, HeaderContent, Message, State, Tip}; @@ -26,7 +26,7 @@ pub enum Error { IntersectionNotFound, #[error("error while sending or receiving data through the channel")] - Plexer(plexer::Error), + Plexer(multiplexer::Error), } pub type IntersectResponse = (Option, Tip); @@ -38,7 +38,7 @@ pub enum NextResponse { Await, } -pub struct Client(State, plexer::ChannelBuffer, PhantomData) +pub struct Client(State, multiplexer::ChannelBuffer, PhantomData) where Message: Fragment; @@ -46,10 +46,10 @@ impl Client where Message: Fragment, { - pub fn new(channel: plexer::AgentChannel) -> Self { + pub fn new(channel: multiplexer::AgentChannel) -> Self { Self( State::Idle, - plexer::ChannelBuffer::new(channel), + multiplexer::ChannelBuffer::new(channel), PhantomData {}, ) } diff --git a/pallas-network/src/miniprotocols/handshake/client.rs b/pallas-network/src/miniprotocols/handshake/client.rs index cb15321..0a795ea 100644 --- a/pallas-network/src/miniprotocols/handshake/client.rs +++ b/pallas-network/src/miniprotocols/handshake/client.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use tracing::debug; use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable}; -use crate::plexer; +use crate::multiplexer; #[derive(Debug)] pub enum Confirmation { @@ -11,17 +11,17 @@ pub enum Confirmation { Rejected(RefuseReason), } -pub struct Client(State, plexer::ChannelBuffer, PhantomData); +pub struct Client(State, multiplexer::ChannelBuffer, PhantomData); impl Client where D: std::fmt::Debug + Clone, Message: Fragment, { - pub fn new(channel: plexer::AgentChannel) -> Self { + pub fn new(channel: multiplexer::AgentChannel) -> Self { Self( State::Propose, - plexer::ChannelBuffer::new(channel), + multiplexer::ChannelBuffer::new(channel), PhantomData {}, ) } @@ -122,7 +122,7 @@ where self.recv_while_confirm().await } - pub fn unwrap(self) -> plexer::AgentChannel { + pub fn unwrap(self) -> multiplexer::AgentChannel { self.1.unwrap() } } diff --git a/pallas-network/src/miniprotocols/handshake/protocol.rs b/pallas-network/src/miniprotocols/handshake/protocol.rs index 845c283..1ab2a28 100644 --- a/pallas-network/src/miniprotocols/handshake/protocol.rs +++ b/pallas-network/src/miniprotocols/handshake/protocol.rs @@ -3,7 +3,7 @@ use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; use std::{collections::HashMap, fmt::Debug}; use thiserror::*; -use crate::plexer; +use crate::multiplexer; #[derive(Error, Debug)] pub enum Error { @@ -20,7 +20,7 @@ pub enum Error { InvalidOutbound, #[error("error while sending or receiving data through the channel")] - Plexer(plexer::Error), + Plexer(multiplexer::Error), } #[derive(Debug, Clone)] diff --git a/pallas-network/src/miniprotocols/handshake/server.rs b/pallas-network/src/miniprotocols/handshake/server.rs index e29968e..cd05a5b 100644 --- a/pallas-network/src/miniprotocols/handshake/server.rs +++ b/pallas-network/src/miniprotocols/handshake/server.rs @@ -3,19 +3,19 @@ use std::marker::PhantomData; use pallas_codec::Fragment; use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable}; -use crate::plexer; +use crate::multiplexer; -pub struct Server(State, plexer::ChannelBuffer, PhantomData); +pub struct Server(State, multiplexer::ChannelBuffer, PhantomData); impl Server where D: std::fmt::Debug + Clone, Message: Fragment, { - pub fn new(channel: plexer::AgentChannel) -> Self { + pub fn new(channel: multiplexer::AgentChannel) -> Self { Self( State::Propose, - plexer::ChannelBuffer::new(channel), + multiplexer::ChannelBuffer::new(channel), PhantomData {}, ) } @@ -109,7 +109,7 @@ where Ok(()) } - pub fn unwrap(self) -> plexer::AgentChannel { + pub fn unwrap(self) -> multiplexer::AgentChannel { self.1.unwrap() } } diff --git a/pallas-network/src/miniprotocols/localstate/client.rs b/pallas-network/src/miniprotocols/localstate/client.rs index 21af2e6..ef5e2cc 100644 --- a/pallas-network/src/miniprotocols/localstate/client.rs +++ b/pallas-network/src/miniprotocols/localstate/client.rs @@ -7,7 +7,7 @@ use thiserror::*; use super::{AcquireFailure, Message, Query, State}; use crate::miniprotocols::Point; -use crate::plexer; +use crate::multiplexer; #[derive(Error, Debug)] pub enum Error { @@ -24,7 +24,7 @@ pub enum Error { #[error("failure acquiring point, too old")] AcquirePointTooOld, #[error("error while sending or receiving data through the channel")] - Plexer(plexer::Error), + Plexer(multiplexer::Error), } impl From for Error { @@ -36,7 +36,7 @@ impl From for Error { } } -pub struct Client(State, plexer::ChannelBuffer, PhantomData) +pub struct Client(State, multiplexer::ChannelBuffer, PhantomData) where Q: Query, Message: Fragment; @@ -46,10 +46,10 @@ where Q: Query, Message: Fragment, { - pub fn new(channel: plexer::AgentChannel) -> Self { + pub fn new(channel: multiplexer::AgentChannel) -> Self { Self( State::Idle, - plexer::ChannelBuffer::new(channel), + multiplexer::ChannelBuffer::new(channel), PhantomData {}, ) } diff --git a/pallas-network/src/miniprotocols/txmonitor/client.rs b/pallas-network/src/miniprotocols/txmonitor/client.rs index 79eaec9..f988873 100644 --- a/pallas-network/src/miniprotocols/txmonitor/client.rs +++ b/pallas-network/src/miniprotocols/txmonitor/client.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use thiserror::*; use super::protocol::*; -use crate::plexer; +use crate::multiplexer; #[derive(Error, Debug)] pub enum Error { @@ -19,14 +19,14 @@ pub enum Error { InvalidOutbound, #[error("error while sending or receiving data through the channel")] - Plexer(plexer::Error), + Plexer(multiplexer::Error), } -pub struct Client(State, plexer::ChannelBuffer); +pub struct Client(State, multiplexer::ChannelBuffer); impl Client { - pub fn new(channel: plexer::AgentChannel) -> Self { - Self(State::Idle, plexer::ChannelBuffer::new(channel)) + pub fn new(channel: multiplexer::AgentChannel) -> Self { + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) } pub fn state(&self) -> &State { diff --git a/pallas-network/src/miniprotocols/txsubmission/client.rs b/pallas-network/src/miniprotocols/txsubmission/client.rs index 1d7577f..c669660 100644 --- a/pallas-network/src/miniprotocols/txsubmission/client.rs +++ b/pallas-network/src/miniprotocols/txsubmission/client.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use crate::plexer; +use crate::multiplexer; use pallas_codec::Fragment; use super::{ @@ -18,7 +18,7 @@ pub enum Request { /// to another server pub struct GenericClient( State, - plexer::ChannelBuffer, + multiplexer::ChannelBuffer, PhantomData, PhantomData, ) @@ -32,10 +32,10 @@ impl GenericClient where Message: Fragment, { - pub fn new(channel: plexer::AgentChannel) -> Self { + pub fn new(channel: multiplexer::AgentChannel) -> Self { Self( State::Init, - plexer::ChannelBuffer::new(channel), + multiplexer::ChannelBuffer::new(channel), PhantomData {}, PhantomData {}, ) diff --git a/pallas-network/src/miniprotocols/txsubmission/protocol.rs b/pallas-network/src/miniprotocols/txsubmission/protocol.rs index ba46a2b..1513e9a 100644 --- a/pallas-network/src/miniprotocols/txsubmission/protocol.rs +++ b/pallas-network/src/miniprotocols/txsubmission/protocol.rs @@ -1,6 +1,6 @@ use thiserror::Error; -use crate::plexer; +use crate::multiplexer; #[derive(Debug, PartialEq, Eq, Clone)] pub enum State { @@ -47,7 +47,7 @@ pub enum Error { AlreadyInitialized, #[error("error while sending or receiving data through the channel")] - Plexer(plexer::Error), + Plexer(multiplexer::Error), } #[derive(Debug)] diff --git a/pallas-network/src/miniprotocols/txsubmission/server.rs b/pallas-network/src/miniprotocols/txsubmission/server.rs index eed9050..878b325 100644 --- a/pallas-network/src/miniprotocols/txsubmission/server.rs +++ b/pallas-network/src/miniprotocols/txsubmission/server.rs @@ -6,7 +6,7 @@ use super::{ protocol::{Blocking, Error, Message, State, TxCount, TxIdAndSize}, EraTxBody, EraTxId, }; -use crate::plexer; +use crate::multiplexer; pub enum Reply { TxIds(Vec>), @@ -18,7 +18,7 @@ pub enum Reply { /// and receive transactions from a client pub struct GenericServer( State, - plexer::ChannelBuffer, + multiplexer::ChannelBuffer, PhantomData, PhantomData, ) @@ -32,10 +32,10 @@ impl GenericServer where Message: Fragment, { - pub fn new(channel: plexer::AgentChannel) -> Self { + pub fn new(channel: multiplexer::AgentChannel) -> Self { Self( State::Init, - plexer::ChannelBuffer::new(channel), + multiplexer::ChannelBuffer::new(channel), PhantomData {}, PhantomData {}, ) diff --git a/pallas-network/src/plexer.rs b/pallas-network/src/multiplexer.rs similarity index 59% rename from pallas-network/src/plexer.rs rename to pallas-network/src/multiplexer.rs index b77a1f8..733d9f2 100644 --- a/pallas-network/src/plexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -1,13 +1,120 @@ +use byteorder::{ByteOrder, NetworkEndian}; use pallas_codec::{minicbor, Fragment}; +use std::net::SocketAddr; +use std::path::Path; use thiserror::Error; +use tokio::io::AsyncWriteExt; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs, UnixStream}; +use tokio::select; use tokio::sync::mpsc::error::SendError; -use tokio::{select, time::Instant}; +use tokio::time::Instant; use tracing::{debug, error, trace}; -use crate::bearer::{Bearer, Payload, Protocol, SegmentBuffer}; +const HEADER_LEN: usize = 8; -#[derive(Error, Debug)] +pub type Timestamp = u32; + +pub type Payload = Vec; + +pub type Protocol = u16; + +#[derive(Debug)] +pub struct Header { + pub protocol: Protocol, + pub timestamp: Timestamp, + pub payload_len: u16, +} + +impl From<&[u8]> for Header { + fn from(value: &[u8]) -> Self { + let timestamp = NetworkEndian::read_u32(&value[0..4]); + let protocol = NetworkEndian::read_u16(&value[4..6]); + let payload_len = NetworkEndian::read_u16(&value[6..8]); + + Self { + timestamp, + protocol, + payload_len, + } + } +} + +impl From
for [u8; 8] { + fn from(value: Header) -> Self { + let mut out = [0u8; 8]; + NetworkEndian::write_u32(&mut out[0..4], value.timestamp); + NetworkEndian::write_u16(&mut out[4..6], value.protocol); + NetworkEndian::write_u16(&mut out[6..8], value.payload_len); + + out + } +} + +pub struct Segment { + pub header: Header, + pub payload: Payload, +} + +pub enum Bearer { + Tcp(TcpStream), + Unix(UnixStream), +} + +const BUFFER_LEN: usize = 1024 * 10; + +impl Bearer { + pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result { + let stream = TcpStream::connect(addr).await?; + Ok(Self::Tcp(stream)) + } + + pub async fn accept_tcp(listener: TcpListener) -> tokio::io::Result<(Self, SocketAddr)> { + let (stream, addr) = listener.accept().await?; + Ok((Self::Tcp(stream), addr)) + } + + pub async fn connect_unix(path: impl AsRef) -> Result { + let stream = UnixStream::connect(path).await?; + Ok(Self::Unix(stream)) + } + + pub async fn readable(&self) -> tokio::io::Result<()> { + match self { + Bearer::Tcp(x) => x.readable().await, + Bearer::Unix(x) => x.readable().await, + } + } + + fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { + match self { + Bearer::Tcp(x) => x.try_read(buf), + Bearer::Unix(x) => x.try_read(buf), + } + } + + async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> { + match self { + Bearer::Tcp(x) => x.write_all(buf).await, + Bearer::Unix(x) => x.write_all(buf).await, + } + } + + async fn flush(&mut self) -> tokio::io::Result<()> { + match self { + Bearer::Tcp(x) => x.flush().await, + Bearer::Unix(x) => x.flush().await, + } + } +} + +#[derive(Debug, Error)] pub enum Error { + #[error("no data available in bearer to complete segment")] + EmptyBearer, + + #[error("bearer I/O error")] + BearerIo(tokio::io::Error), + #[error("failure to encode channel message")] Decoding(String), @@ -25,20 +132,103 @@ pub enum Error { #[error("plexer failed to mux chunk")] PlexerMux, +} - #[error("bearer IO error")] - Bearer(tokio::io::Error), +pub struct SegmentBuffer(Bearer, Vec); + +impl SegmentBuffer { + pub fn new(bearer: Bearer) -> Self { + Self(bearer, Vec::with_capacity(BUFFER_LEN)) + } + + /// Cancel-safe loop that reads from bearer until certain len + async fn cancellable_read(&mut self, required: usize) -> Result<(), Error> { + loop { + self.0.readable().await.map_err(Error::BearerIo)?; + trace!("bearer is readable"); + + let remaining = required - self.1.len(); + let mut buf = vec![0u8; remaining]; + + match self.0.try_read(&mut buf) { + Ok(0) => break Err(Error::EmptyBearer), + Ok(n) => { + trace!(n, "found data on bearer"); + self.1.extend_from_slice(&buf[0..n]); + + if self.1.len() >= required { + break Ok(()); + } + } + Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => { + trace!("reading from bearer would block"); + continue; + } + Err(e) => { + return Err(Error::BearerIo(e)); + } + } + } + } + + /// Peek the available data in search for a frame header + async fn peek_header(&mut self) -> Result { + trace!("waiting for header buf"); + self.cancellable_read(HEADER_LEN).await?; + + trace!("found enough data for header"); + let header = &self.1[..HEADER_LEN]; + + Ok(Header::from(header)) + } + + // Cancel-safe read of a full segment from the bearer + pub async fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> { + let header = self.peek_header().await?; + + trace!("waiting for full segment buf"); + let segment_size = HEADER_LEN + header.payload_len as usize; + + self.cancellable_read(segment_size).await?; + + trace!("draining segment buffer"); + let segment = self.1.drain(..segment_size); + let payload = segment.skip(HEADER_LEN).collect(); + + Ok((header.protocol, payload)) + } + + pub async fn write_segment( + &mut self, + protocol: u16, + clock: &Instant, + payload: &[u8], + ) -> Result<(), std::io::Error> { + let header = Header { + protocol, + timestamp: clock.elapsed().as_micros() as u32, + payload_len: payload.len() as u16, + }; + + let buf: [u8; 8] = header.into(); + self.0.write_all(&buf).await?; + self.0.write_all(payload).await?; + + self.0.flush().await?; + + Ok(()) + } } pub struct AgentChannel { - enqueue_protocol: crate::bearer::Protocol, - dequeue_protocol: crate::bearer::Protocol, + enqueue_protocol: Protocol, + dequeue_protocol: Protocol, to_plexer: tokio::sync::mpsc::Sender<(Protocol, Payload)>, from_plexer: tokio::sync::broadcast::Receiver<(Protocol, Payload)>, } impl AgentChannel { - fn for_client(protocol: crate::bearer::Protocol, ingress: &Ingress, egress: &Egress) -> Self { + fn for_client(protocol: Protocol, ingress: &Ingress, egress: &Egress) -> Self { Self { enqueue_protocol: protocol, dequeue_protocol: protocol ^ 0x8000, @@ -47,7 +237,7 @@ impl AgentChannel { } } - fn for_server(protocol: crate::bearer::Protocol, ingress: &Ingress, egress: &Egress) -> Self { + fn for_server(protocol: Protocol, ingress: &Ingress, egress: &Egress) -> Self { Self { enqueue_protocol: protocol ^ 0x8000, dequeue_protocol: protocol, diff --git a/pallas-network/tests/plexer.rs b/pallas-network/tests/plexer.rs index 2f5f70c..2f03d96 100644 --- a/pallas-network/tests/plexer.rs +++ b/pallas-network/tests/plexer.rs @@ -1,6 +1,6 @@ use std::net::{Ipv4Addr, SocketAddrV4}; -use pallas_network::{bearer::Bearer, plexer::Plexer}; +use pallas_network::multiplexer::{Bearer, Plexer}; use rand::{distributions::Uniform, Rng}; use tokio::net::TcpListener; diff --git a/pallas/src/lib.rs b/pallas/src/lib.rs index 84bfa16..de88328 100644 --- a/pallas/src/lib.rs +++ b/pallas/src/lib.rs @@ -30,3 +30,6 @@ pub use pallas_crypto as crypto; #[doc(inline)] pub use pallas_codec as codec; + +#[doc(inline)] +pub use pallas_upstream as upstream;