From f67e010c763c3525af51e05d3d526fe9be33b28b Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 4 Jun 2022 17:39:29 -0300 Subject: [PATCH] feat: Improve multiplexer ergonomics (#111) --- examples/block-download/src/main.rs | 18 +-- examples/n2c-miniprotocols/src/main.rs | 16 +-- examples/n2n-miniprotocols/src/main.rs | 20 +-- pallas-miniprotocols/Cargo.toml | 2 +- pallas-miniprotocols/src/chainsync/agents.rs | 4 +- .../src/chainsync/protocol.rs | 6 + pallas-miniprotocols/src/localstate/mod.rs | 2 +- pallas-miniprotocols/src/machines.rs | 32 +++-- pallas-miniprotocols/src/txsubmission/mod.rs | 2 +- pallas-multiplexer/Cargo.toml | 3 +- pallas-multiplexer/src/agents.rs | 9 +- pallas-multiplexer/src/bearers.rs | 98 +++++++++----- pallas-multiplexer/src/demux.rs | 38 ++---- pallas-multiplexer/src/lib.rs | 28 ++-- pallas-multiplexer/src/mux.rs | 32 +---- pallas-multiplexer/src/std.rs | 120 +++++++++++------- pallas-multiplexer/tests/integration.rs | 71 ++--------- pallas-primitives/src/alonzo/address.rs | 2 +- 18 files changed, 242 insertions(+), 261 deletions(-) diff --git a/examples/block-download/src/main.rs b/examples/block-download/src/main.rs index a7deda0..82000b7 100644 --- a/examples/block-download/src/main.rs +++ b/examples/block-download/src/main.rs @@ -1,17 +1,13 @@ -use net2::TcpStreamExt; - use pallas::network::{ miniprotocols::{ handshake::{n2n::VersionTable, Initiator}, run_agent, Point, MAINNET_MAGIC, }, - multiplexer::{spawn_demuxer, spawn_muxer, use_channel, StdPlexer}, + multiplexer::{bearers::Bearer, StdPlexer}, }; use pallas::network::miniprotocols::blockfetch::{BatchClient, Observer}; -use std::net::TcpStream; - #[derive(Debug)] struct BlockPrinter; @@ -26,16 +22,14 @@ impl Observer for BlockPrinter { fn main() { env_logger::init(); - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); let mut plexer = StdPlexer::new(bearer); - let mut channel0 = use_channel(&mut plexer, 0); - let mut channel3 = use_channel(&mut plexer, 3); + let mut channel0 = plexer.use_channel(0); + let mut channel3 = plexer.use_channel(3); - spawn_muxer(plexer.muxer); - spawn_demuxer(plexer.demuxer); + plexer.muxer.spawn(); + plexer.demuxer.spawn(); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); let _last = run_agent(Initiator::initial(versions), &mut channel0).unwrap(); diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 2f79782..b1b5b7f 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,10 +1,8 @@ use pallas::network::{ miniprotocols::{chainsync, handshake, localstate, run_agent, Point, MAINNET_MAGIC}, - multiplexer, + multiplexer::{self, bearers::Bearer}, }; -use std::os::unix::net::UnixStream; - #[derive(Debug)] struct LoggingObserver; @@ -86,17 +84,17 @@ fn main() { // we connect to the unix socket of the local node. Make sure you have the right // path for your environment - let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); + let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap(); // setup the multiplexer by specifying the bearer and the IDs of the // miniprotocols to use let mut plexer = multiplexer::StdPlexer::new(bearer); - let channel0 = multiplexer::use_channel(&mut plexer, 0); - let channel7 = multiplexer::use_channel(&mut plexer, 7); - let channel5 = multiplexer::use_channel(&mut plexer, 5); + let channel0 = plexer.use_channel(0); + let channel7 = plexer.use_channel(7); + let channel5 = plexer.use_channel(5); - multiplexer::spawn_muxer(plexer.muxer); - multiplexer::spawn_demuxer(plexer.demuxer); + plexer.muxer.spawn(); + plexer.demuxer.spawn(); // execute the required handshake against the relay do_handshake(channel0); diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 3ff4227..4419182 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -1,12 +1,8 @@ -use net2::TcpStreamExt; - use pallas::network::{ miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC}, - multiplexer::{spawn_demuxer, spawn_muxer, use_channel, StdChannel, StdPlexer}, + multiplexer::{bearers::Bearer, StdChannel, StdPlexer}, }; -use std::net::TcpStream; - #[derive(Debug)] struct LoggingObserver; @@ -105,19 +101,17 @@ fn main() { // setup a TCP socket to act as data bearer between our agents and the remote // relay. - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); // setup the multiplexer by specifying the bearer and the IDs of the // miniprotocols to use let mut plexer = StdPlexer::new(bearer); - let channel0 = use_channel(&mut plexer, 0); - let channel3 = use_channel(&mut plexer, 3); - let channel2 = use_channel(&mut plexer, 2); + let channel0 = plexer.use_channel(0); + let channel3 = plexer.use_channel(3); + let channel2 = plexer.use_channel(2); - spawn_muxer(plexer.muxer); - spawn_demuxer(plexer.demuxer); + plexer.muxer.spawn(); + plexer.demuxer.spawn(); // execute the required handshake against the relay do_handshake(channel0); diff --git a/pallas-miniprotocols/Cargo.toml b/pallas-miniprotocols/Cargo.toml index 9cdb24f..ff12759 100644 --- a/pallas-miniprotocols/Cargo.toml +++ b/pallas-miniprotocols/Cargo.toml @@ -18,4 +18,4 @@ pallas-multiplexer = { version = "0.9.0", path = "../pallas-multiplexer/" } log = "0.4.14" hex = "0.4.3" itertools = "0.10.3" -net2 = "0.2.37" +thiserror = "1.0.31" diff --git a/pallas-miniprotocols/src/chainsync/agents.rs b/pallas-miniprotocols/src/chainsync/agents.rs index f6a1fdd..76d6661 100644 --- a/pallas-miniprotocols/src/chainsync/agents.rs +++ b/pallas-miniprotocols/src/chainsync/agents.rs @@ -250,7 +250,7 @@ where self.on_intersect_found(point, tip) } (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), - (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg)), + (state, msg) => Err(MachineError::invalid_msg::(state, &msg)), } } } @@ -347,7 +347,7 @@ impl Agent for TipFinder { self.on_intersect_found(tip) } (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), - (state, msg) => Err(MachineError::InvalidMsgForState(state.clone(), msg)), + (state, msg) => Err(MachineError::invalid_msg::(state, &msg)), } } } diff --git a/pallas-miniprotocols/src/chainsync/protocol.rs b/pallas-miniprotocols/src/chainsync/protocol.rs index 8726513..5fa5b95 100644 --- a/pallas-miniprotocols/src/chainsync/protocol.rs +++ b/pallas-miniprotocols/src/chainsync/protocol.rs @@ -45,5 +45,11 @@ impl Deref for BlockContent { } } +impl From for Vec { + fn from(other: BlockContent) -> Self { + other.0 + } +} + #[derive(Debug)] pub struct SkippedContent; diff --git a/pallas-miniprotocols/src/localstate/mod.rs b/pallas-miniprotocols/src/localstate/mod.rs index cdea7ed..0650850 100644 --- a/pallas-miniprotocols/src/localstate/mod.rs +++ b/pallas-miniprotocols/src/localstate/mod.rs @@ -163,7 +163,7 @@ where (State::Acquiring, Message::Acquired) => self.on_acquired(), (State::Acquiring, Message::Failure(failure)) => self.on_failure(failure), (State::Querying, Message::Result(result)) => self.on_result(result), - (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg)), + (state, msg) => Err(MachineError::invalid_msg::(state, &msg)), } } } diff --git a/pallas-miniprotocols/src/machines.rs b/pallas-miniprotocols/src/machines.rs index f151a56..2a2aeab 100644 --- a/pallas-miniprotocols/src/machines.rs +++ b/pallas-miniprotocols/src/machines.rs @@ -1,15 +1,21 @@ use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError}; -use std::cell::Cell; +use std::{cell::Cell, fmt::Debug}; +use thiserror::Error; -#[derive(Debug)] -pub enum MachineError { - InvalidMsgForState(A::State, A::Message), +#[derive(Debug, Error)] +pub enum MachineError { + #[error("invalid message for state [{0}]: {1}")] + InvalidMsgForState(String, String), + + #[error("channel error communicating with multiplexer: {0}")] ChannelError(ChannelError), + + #[error("downstream error while processing business logic {0}")] DownstreamError(Box), } -impl MachineError { +impl MachineError { pub fn channel(err: ChannelError) -> Self { Self::ChannelError(err) } @@ -17,13 +23,17 @@ impl MachineError { pub fn downstream(err: Box) -> Self { Self::DownstreamError(err) } + + pub fn invalid_msg(state: &A::State, msg: &A::Message) -> Self { + Self::InvalidMsgForState(format!("{:?}", state), format!("{:?}", msg)) + } } -pub type Transition = Result>; +pub type Transition = Result; pub trait Agent: Sized { - type Message; - type State; + type Message: std::fmt::Debug; + type State: std::fmt::Debug; fn state(&self) -> &Self::State; fn is_done(&self) -> bool; @@ -56,14 +66,14 @@ where } } - pub fn start(&mut self) -> Result<(), MachineError> { + pub fn start(&mut self) -> Result<(), MachineError> { let prev = self.agent.take().unwrap(); let next = prev.apply_start()?; self.agent.set(Some(next)); Ok(()) } - pub fn run_step(&mut self) -> Result> { + pub fn run_step(&mut self) -> Result { let prev = self.agent.take().unwrap(); let next = run_agent_step(prev, &mut self.buffer)?; let is_done = next.is_done(); @@ -73,7 +83,7 @@ where Ok(is_done) } - pub fn fulfill(mut self) -> Result<(), MachineError> { + pub fn fulfill(mut self) -> Result<(), MachineError> { self.start()?; while self.run_step()? {} diff --git a/pallas-miniprotocols/src/txsubmission/mod.rs b/pallas-miniprotocols/src/txsubmission/mod.rs index e703203..d76c01e 100644 --- a/pallas-miniprotocols/src/txsubmission/mod.rs +++ b/pallas-miniprotocols/src/txsubmission/mod.rs @@ -300,7 +300,7 @@ impl Agent for NaiveProvider { ..self }), (State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids), - (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg)), + (state, msg) => Err(MachineError::invalid_msg::(state, &msg)), } } } diff --git a/pallas-multiplexer/Cargo.toml b/pallas-multiplexer/Cargo.toml index 18a131e..4f04892 100644 --- a/pallas-multiplexer/Cargo.toml +++ b/pallas-multiplexer/Cargo.toml @@ -18,10 +18,11 @@ log = "0.4.14" byteorder = "1.4.3" hex = "0.4.3" rand = "0.8.4" +thiserror = "1.0.31" [dev-dependencies] env_logger = "0.9.0" [features] std = [] -default = ["std"] \ No newline at end of file +default = ["std"] diff --git a/pallas-multiplexer/src/agents.rs b/pallas-multiplexer/src/agents.rs index ee31e76..e5f7442 100644 --- a/pallas-multiplexer/src/agents.rs +++ b/pallas-multiplexer/src/agents.rs @@ -1,13 +1,18 @@ //! Interface to interact with the multiplexer as an agent use crate::Payload; - use pallas_codec::{minicbor, Fragment}; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum ChannelError { + #[error("channel is not connected, failed to send payload")] NotConnected(Option), + + #[error("failure encoding message into CBOR")] Encoding(String), + + #[error("failure decoding message from CBOR")] Decoding(String), } diff --git a/pallas-multiplexer/src/bearers.rs b/pallas-multiplexer/src/bearers.rs index 2520fd7..65c23a2 100644 --- a/pallas-multiplexer/src/bearers.rs +++ b/pallas-multiplexer/src/bearers.rs @@ -1,28 +1,19 @@ use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt}; use log::{debug, log_enabled, trace}; use std::io::{Read, Write}; -#[cfg(target_family = "unix")] -use std::os::unix::net::UnixStream; +use std::net::{TcpListener, ToSocketAddrs}; use std::{net::TcpStream, time::Instant}; use crate::Payload; +#[cfg(target_family = "unix")] +use std::os::unix::net::UnixStream; + pub struct Segment { pub protocol: u16, pub timestamp: u32, pub payload: Payload, } - -pub trait Bearer: Read + Write + Send + Sync + Sized { - type Error: std::error::Error; - - fn read_segment(&mut self) -> Result, Self::Error>; - - fn write_segment(&mut self, segment: Segment) -> Result<(), Self::Error>; - - fn clone(&self) -> Self; -} - impl Segment { pub fn new(clock: Instant, protocol: u16, payload: Payload) -> Self { Segment { @@ -104,35 +95,74 @@ fn read_segment_with_timeout(reader: &mut impl Read) -> Result, } } -impl Bearer for TcpStream { - type Error = std::io::Error; +pub enum Bearer { + Tcp(TcpStream), - fn clone(&self) -> Self { - self.try_clone().expect("error cloning tcp stream") + #[cfg(target_family = "unix")] + Unix(UnixStream), +} + +impl Bearer { + pub fn connect_tcp(addr: A) -> Result { + let bearer = TcpStream::connect(addr)?; + bearer.set_nodelay(true)?; + + Ok(Bearer::Tcp(bearer)) } - fn read_segment(&mut self) -> Result, std::io::Error> { - read_segment_with_timeout(self) + pub fn accept_tcp(server: TcpListener) -> Result { + let (bearer, _) = server.accept().unwrap(); + bearer.set_nodelay(true)?; + + Ok(Bearer::Tcp(bearer)) } - fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> { - write_segment(self, segment) + #[cfg(target_family = "unix")] + pub fn connect_unix>(path: P) -> Result { + let bearer = UnixStream::connect(path)?; + + Ok(Bearer::Unix(bearer)) + } + + pub fn read_segment(&mut self) -> Result, std::io::Error> { + match self { + Bearer::Tcp(s) => read_segment_with_timeout(s), + + #[cfg(target_family = "unix")] + Bearer::Unix(s) => read_segment_with_timeout(s), + } + } + + pub fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> { + match self { + Bearer::Tcp(s) => write_segment(s, segment), + + #[cfg(target_family = "unix")] + Bearer::Unix(s) => write_segment(s, segment), + } + } +} + +impl From for Bearer { + fn from(stream: TcpStream) -> Self { + Bearer::Tcp(stream) } } #[cfg(target_family = "unix")] -impl Bearer for UnixStream { - type Error = std::io::Error; - - fn clone(&self) -> Self { - self.try_clone().expect("error cloning unix stream") - } - - fn read_segment(&mut self) -> Result, std::io::Error> { - read_segment_with_timeout(self) - } - - fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> { - write_segment(self, segment) +impl From for Bearer { + fn from(stream: UnixStream) -> Self { + Bearer::Unix(stream) + } +} + +impl Clone for Bearer { + fn clone(&self) -> Self { + match self { + Bearer::Tcp(s) => Bearer::Tcp(s.try_clone().expect("error cloning tcp stream")), + + #[cfg(target_family = "unix")] + Bearer::Unix(s) => Bearer::Unix(s.try_clone().expect("error cloning unix stream")), + } } } diff --git a/pallas-multiplexer/src/demux.rs b/pallas-multiplexer/src/demux.rs index 4342085..80c577e 100644 --- a/pallas-multiplexer/src/demux.rs +++ b/pallas-multiplexer/src/demux.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::{bearers::Bearer, std::Cancel, Payload}; +use crate::{bearers::Bearer, Payload}; pub struct EgressError(pub Payload); @@ -8,8 +8,8 @@ pub trait Egress { fn send(&self, payload: Payload) -> Result<(), EgressError>; } -pub enum DemuxError { - BearerError(B::Error), +pub enum DemuxError { + BearerError(std::io::Error), EgressDisconnected(u16, Payload), EgressUnknown(u16, Payload), } @@ -20,17 +20,16 @@ pub enum TickOutcome { } /// A demuxer that reads from a bearer into the corresponding egress -pub struct Demuxer { - bearer: B, +pub struct Demuxer { + bearer: Bearer, egress: HashMap, } -impl Demuxer +impl Demuxer where - B: Bearer, E: Egress, { - pub fn new(bearer: B) -> Self { + pub fn new(bearer: Bearer) -> Self { Demuxer { bearer, egress: Default::default(), @@ -41,7 +40,7 @@ where self.egress.insert(id, tx); } - fn dispatch(&self, protocol: u16, payload: Payload) -> Result<(), DemuxError> { + fn dispatch(&self, protocol: u16, payload: Payload) -> Result<(), DemuxError> { match self.egress.get(&protocol) { Some(tx) => match tx.send(payload) { Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)), @@ -51,7 +50,7 @@ where } } - pub fn tick(&mut self) -> Result> { + pub fn tick(&mut self) -> Result { match self.bearer.read_segment() { Err(err) => Err(DemuxError::BearerError(err)), Ok(None) => Ok(TickOutcome::Idle), @@ -61,23 +60,4 @@ where }, } } - - pub fn block(&mut self, cancel: Cancel) -> Result<(), B::Error> { - loop { - match self.tick() { - Ok(TickOutcome::Busy) => (), - Ok(TickOutcome::Idle) => match cancel.is_set() { - true => break Ok(()), - false => (), - }, - Err(DemuxError::BearerError(err)) => return Err(err), - Err(DemuxError::EgressDisconnected(id, _)) => { - log::warn!("disconnected protocol {}", id) - } - Err(DemuxError::EgressUnknown(id, _)) => { - log::warn!("unknown protocol {}", id) - } - } - } - } } diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index bb4b852..009fe04 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -3,28 +3,34 @@ pub mod bearers; pub mod demux; pub mod mux; +use bearers::Bearer; + +#[cfg(feature = "std")] +mod std; + +#[cfg(feature = "std")] +pub use crate::std::*; + pub type Payload = Vec; -pub struct Multiplexer +pub struct Multiplexer where - B: bearers::Bearer, I: mux::Ingress, E: demux::Egress, { - pub muxer: mux::Muxer, - pub demuxer: demux::Demuxer, + pub muxer: mux::Muxer, + pub demuxer: demux::Demuxer, } -impl Multiplexer +impl Multiplexer where - B: bearers::Bearer, I: mux::Ingress, E: demux::Egress, { - pub fn new(bearer: B) -> Self { + pub fn new(bearer: Bearer) -> Self { Multiplexer { muxer: mux::Muxer::new(bearer.clone()), - demuxer: demux::Demuxer::new(bearer.clone()), + demuxer: demux::Demuxer::new(bearer), } } @@ -33,9 +39,3 @@ where self.demuxer.register(protocol, egress); } } - -#[cfg(feature = "std")] -mod std; - -#[cfg(feature = "std")] -pub use crate::std::*; diff --git a/pallas-multiplexer/src/mux.rs b/pallas-multiplexer/src/mux.rs index fdd4461..854703d 100644 --- a/pallas-multiplexer/src/mux.rs +++ b/pallas-multiplexer/src/mux.rs @@ -5,7 +5,6 @@ use rand::thread_rng; use crate::{ bearers::{Bearer, Segment}, - std::Cancel, Payload, }; @@ -24,27 +23,23 @@ pub trait Ingress { type Message = (u16, Payload); -pub enum TickOutcome -where - TBearer: Bearer, -{ - BearerError(TBearer::Error), +pub enum TickOutcome { + BearerError(std::io::Error), Idle, Busy, } -pub struct Muxer { - bearer: B, +pub struct Muxer { + bearer: Bearer, ingress: HashMap, clock: Instant, } -impl Muxer +impl Muxer where - B: Bearer, I: Ingress, { - pub fn new(bearer: B) -> Self { + pub fn new(bearer: Bearer) -> Self { Self { bearer, ingress: Default::default(), @@ -93,7 +88,7 @@ where None } - pub fn tick(&mut self) -> TickOutcome { + pub fn tick(&mut self) -> TickOutcome { match self.select() { Some((id, payload)) => { let segment = Segment::new(self.clock, id, payload); @@ -106,17 +101,4 @@ where None => TickOutcome::Idle, } } - - pub fn block(&mut self, cancel: Cancel) -> Result<(), B::Error> { - loop { - match self.tick() { - TickOutcome::BearerError(err) => return Err(err), - TickOutcome::Idle => match cancel.is_set() { - true => break Ok(()), - false => std::thread::yield_now(), - }, - TickOutcome::Busy => (), - } - } - } } diff --git a/pallas-multiplexer/src/std.rs b/pallas-multiplexer/src/std.rs index bd9865e..fb5a09f 100644 --- a/pallas-multiplexer/src/std.rs +++ b/pallas-multiplexer/src/std.rs @@ -1,4 +1,4 @@ -use crate::{agents, bearers::Bearer, demux, mux, Payload}; +use crate::{agents, demux, mux, Payload}; use std::{ sync::{ @@ -7,6 +7,7 @@ use std::{ Arc, }, thread::{spawn, JoinHandle}, + time::Duration, }; pub type StdIngress = Receiver; @@ -32,7 +33,73 @@ impl demux::Egress for StdEgress { } } -pub type StdPlexer = crate::Multiplexer; +pub type StdPlexer = crate::Multiplexer; + +impl StdPlexer { + pub fn use_channel(&mut self, protocol: u16) -> StdChannel { + let (demux_tx, demux_rx) = channel::(); + let (mux_tx, mux_rx) = channel::(); + + self.register_channel(protocol, mux_rx, demux_tx); + + (mux_tx, demux_rx) + } +} + +impl mux::Muxer { + pub fn block(&mut self, cancel: Cancel) -> Result<(), std::io::Error> { + loop { + match self.tick() { + mux::TickOutcome::BearerError(err) => return Err(err), + mux::TickOutcome::Idle => match cancel.is_set() { + true => break Ok(()), + false => { + // TODO: investigate why std::thread::yield_now() hogs the thread + std::thread::sleep(Duration::from_millis(100)) + } + }, + mux::TickOutcome::Busy => (), + } + } + } + + pub fn spawn(mut self) -> Loop { + let cancel = Cancel::default(); + let cancel2 = cancel.clone(); + let thread = spawn(move || self.block(cancel2)); + + Loop { cancel, thread } + } +} + +impl demux::Demuxer { + pub fn block(&mut self, cancel: Cancel) -> Result<(), std::io::Error> { + loop { + match self.tick() { + Ok(demux::TickOutcome::Busy) => (), + Ok(demux::TickOutcome::Idle) => match cancel.is_set() { + true => break Ok(()), + false => (), + }, + Err(demux::DemuxError::BearerError(err)) => return Err(err), + Err(demux::DemuxError::EgressDisconnected(id, _)) => { + log::warn!("disconnected protocol {}", id) + } + Err(demux::DemuxError::EgressUnknown(id, _)) => { + log::warn!("unknown protocol {}", id) + } + } + } + } + + pub fn spawn(mut self) -> Loop { + let cancel = Cancel::default(); + let cancel2 = cancel.clone(); + let thread = spawn(move || self.block(cancel2)); + + Loop { cancel, thread } + } +} pub type StdChannel = (Sender, Receiver); @@ -52,15 +119,6 @@ impl agents::Channel for StdChannel { } } -pub fn use_channel(plexer: &mut StdPlexer, protocol: u16) -> StdChannel { - let (demux_tx, demux_rx) = channel::(); - let (mux_tx, mux_rx) = channel::(); - - plexer.register_channel(protocol, mux_rx, demux_tx); - - (mux_tx, demux_rx) -} - #[derive(Clone, Debug, Default)] pub struct Cancel(Arc); @@ -75,49 +133,17 @@ impl Cancel { } #[derive(Debug)] -pub struct Loop -where - B: Bearer, -{ +pub struct Loop { cancel: Cancel, - thread: JoinHandle>, + thread: JoinHandle>, } -impl Loop -where - B: Bearer, -{ +impl Loop { pub fn cancel(&self) { self.cancel.set(); } - pub fn join(self) -> Result<(), B::Error> { + pub fn join(self) -> Result<(), std::io::Error> { self.thread.join().unwrap() } } - -pub fn spawn_muxer(mut muxer: mux::Muxer) -> Loop -where - B: Bearer + 'static, - B::Error: Send, - I: mux::Ingress + Send + 'static, -{ - let cancel = Cancel::default(); - let cancel2 = cancel.clone(); - let thread = spawn(move || muxer.block(cancel2)); - - Loop { cancel, thread } -} - -pub fn spawn_demuxer(mut demuxer: demux::Demuxer) -> Loop -where - B: Bearer + 'static, - B::Error: Send, - E: demux::Egress + Send + 'static, -{ - let cancel = Cancel::default(); - let cancel2 = cancel.clone(); - let thread = spawn(move || demuxer.block(cancel2)); - - Loop { cancel, thread } -} diff --git a/pallas-multiplexer/tests/integration.rs b/pallas-multiplexer/tests/integration.rs index 6c6c0c9..c217ed4 100644 --- a/pallas-multiplexer/tests/integration.rs +++ b/pallas-multiplexer/tests/integration.rs @@ -1,36 +1,32 @@ use std::{ - net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream}, + net::{Ipv4Addr, SocketAddrV4, TcpListener}, thread::{self, JoinHandle}, - time::Duration, }; use log::info; use pallas_codec::minicbor; use pallas_multiplexer::{ agents::{Channel, ChannelBuffer}, - spawn_demuxer, spawn_muxer, use_channel, StdPlexer, + bearers::Bearer, + StdPlexer, }; use rand::{distributions::Uniform, Rng}; -fn setup_passive_muxer() -> JoinHandle> { +fn setup_passive_muxer() -> JoinHandle { thread::spawn(|| { let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap(); info!("listening for connections on port {}", P); - let (bearer, _) = server.accept().unwrap(); - bearer.set_nonblocking(true).unwrap(); - - bearer - .set_read_timeout(Some(Duration::from_secs(3))) - .unwrap(); + let bearer = Bearer::accept_tcp(server).unwrap(); StdPlexer::new(bearer) }) } -fn setup_active_muxer() -> JoinHandle> { +fn setup_active_muxer() -> JoinHandle { thread::spawn(|| { - let bearer = TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap(); + let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap(); + StdPlexer::new(bearer) }) } @@ -53,11 +49,11 @@ fn one_way_small_sequence_of_payloads() { let mut active_plexer = active.join().unwrap(); let mut passive_plexer = passive.join().unwrap(); - let mut sender_channel = use_channel(&mut active_plexer, 0x0003u16); - let mut receiver_channel = use_channel(&mut passive_plexer, 0x8003u16); + let mut sender_channel = active_plexer.use_channel(0x0003u16); + let mut receiver_channel = passive_plexer.use_channel(0x8003u16); - let loop1 = spawn_muxer(active_plexer.muxer); - let loop2 = spawn_demuxer(passive_plexer.demuxer); + active_plexer.muxer.spawn(); + passive_plexer.demuxer.spawn(); for _ in [0..100] { let payload = random_payload(50); @@ -65,47 +61,6 @@ fn one_way_small_sequence_of_payloads() { let received_payload = receiver_channel.dequeue_chunk().unwrap(); assert_eq!(payload, received_payload); } - - loop1.cancel(); - loop1.join().unwrap(); - - loop2.cancel(); - loop2.join().unwrap(); -} - -#[test] -fn threads_cancel_while_still_sending() { - let passive = setup_passive_muxer::<50401>(); - - // HACK: a small sleep seems to be required for Github actions runner to - // formally expose the port - thread::sleep(std::time::Duration::from_secs(1)); - - let active = setup_active_muxer::<50401>(); - - let mut active_plexer = active.join().unwrap(); - let mut passive_plexer = passive.join().unwrap(); - - let mut sender_channel = use_channel(&mut active_plexer, 0x0003u16); - let mut receiver_channel = use_channel(&mut passive_plexer, 0x8003u16); - - let loop1 = spawn_muxer(active_plexer.muxer); - let loop2 = spawn_demuxer(passive_plexer.demuxer); - - thread::spawn(move || loop { - let payload = random_payload(50); - sender_channel.enqueue_chunk(payload.clone()).unwrap(); - let received_payload = receiver_channel.dequeue_chunk().unwrap(); - assert_eq!(payload, received_payload); - }); - - thread::sleep(Duration::from_secs(5)); - - loop1.cancel(); - loop1.join().unwrap(); - - loop2.cancel(); - loop2.join().unwrap(); } #[test] @@ -130,7 +85,7 @@ fn multiple_messages_in_same_payload() { } #[test] -fn fragmented_message_in_multiple_payload() { +fn fragmented_message_in_multiple_payloads() { let mut input = Vec::new(); let msg = (11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8); minicbor::encode(msg, &mut input).unwrap(); diff --git a/pallas-primitives/src/alonzo/address.rs b/pallas-primitives/src/alonzo/address.rs index 908628c..8f93a3a 100644 --- a/pallas-primitives/src/alonzo/address.rs +++ b/pallas-primitives/src/alonzo/address.rs @@ -15,7 +15,7 @@ impl TransactionOutput { #[cfg(test)] mod tests { - use crate::alonzo::{crypto, BlockWrapper, TransactionBodyComponent}; + use crate::alonzo::{BlockWrapper, TransactionBodyComponent}; use crate::Fragment; const KNOWN_ADDRESSES: &[&str] =&[