From 03c36f57eee8afa27454f3599cd8b8c9faeaee98 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Wed, 30 Nov 2022 23:31:09 +0100 Subject: [PATCH] feat(multiplexer): Introduce sync multiplexer option (#210) --- pallas-miniprotocols/src/handshake/client.rs | 4 ++ pallas-multiplexer/Cargo.toml | 3 +- pallas-multiplexer/src/demux.rs | 10 ++-- pallas-multiplexer/src/lib.rs | 3 ++ pallas-multiplexer/src/std.rs | 2 +- pallas-multiplexer/src/sync.rs | 55 ++++++++++++++++++++ 6 files changed, 72 insertions(+), 5 deletions(-) create mode 100644 pallas-multiplexer/src/sync.rs diff --git a/pallas-miniprotocols/src/handshake/client.rs b/pallas-miniprotocols/src/handshake/client.rs index e99d6fc..8d9e8b6 100644 --- a/pallas-miniprotocols/src/handshake/client.rs +++ b/pallas-miniprotocols/src/handshake/client.rs @@ -132,6 +132,10 @@ where self.send_propose(versions)?; self.recv_while_confirm() } + + pub fn unwrap(self) -> H { + self.1.unwrap() + } } pub type N2NClient = Client; diff --git a/pallas-multiplexer/Cargo.toml b/pallas-multiplexer/Cargo.toml index 08893f7..bbaec52 100644 --- a/pallas-multiplexer/Cargo.toml +++ b/pallas-multiplexer/Cargo.toml @@ -23,4 +23,5 @@ env_logger = "0.9.0" [features] std = [] -default = ["std"] +sync = [] +default = ["std", "sync"] diff --git a/pallas-multiplexer/src/demux.rs b/pallas-multiplexer/src/demux.rs index 80c577e..ce9cf2e 100644 --- a/pallas-multiplexer/src/demux.rs +++ b/pallas-multiplexer/src/demux.rs @@ -5,7 +5,7 @@ use crate::{bearers::Bearer, Payload}; pub struct EgressError(pub Payload); pub trait Egress { - fn send(&self, payload: Payload) -> Result<(), EgressError>; + fn send(&mut self, payload: Payload) -> Result<(), EgressError>; } pub enum DemuxError { @@ -40,8 +40,12 @@ where self.egress.insert(id, tx); } - fn dispatch(&self, protocol: u16, payload: Payload) -> Result<(), DemuxError> { - match self.egress.get(&protocol) { + pub fn unregister(&mut self, id: u16) -> Option { + self.egress.remove(&id) + } + + fn dispatch(&mut self, protocol: u16, payload: Payload) -> Result<(), DemuxError> { + match self.egress.get_mut(&protocol) { Some(tx) => match tx.send(payload) { Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)), Ok(_) => Ok(()), diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index 7d150c5..d6922c5 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -6,6 +6,9 @@ pub mod mux; #[cfg(feature = "std")] mod std; +#[cfg(feature = "sync")] +pub mod sync; + #[cfg(feature = "std")] pub use crate::std::*; diff --git a/pallas-multiplexer/src/std.rs b/pallas-multiplexer/src/std.rs index f8a93a3..320ffdc 100644 --- a/pallas-multiplexer/src/std.rs +++ b/pallas-multiplexer/src/std.rs @@ -29,7 +29,7 @@ impl mux::Ingress for StdIngress { pub type StdEgress = Sender; impl demux::Egress for StdEgress { - fn send(&self, payload: Payload) -> Result<(), demux::EgressError> { + fn send(&mut self, payload: Payload) -> Result<(), demux::EgressError> { match Sender::send(self, payload) { Ok(_) => Ok(()), Err(SendError(p)) => Err(demux::EgressError(p)), diff --git a/pallas-multiplexer/src/sync.rs b/pallas-multiplexer/src/sync.rs new file mode 100644 index 0000000..96f068d --- /dev/null +++ b/pallas-multiplexer/src/sync.rs @@ -0,0 +1,55 @@ +use crate::{ + agents::{self, ChannelBuffer}, + bearers::{Bearer, Segment}, + Payload, +}; + +use std::time::Instant; + +pub struct SyncPlexer { + bearer: Bearer, + protocol: u16, + clock: Instant, +} + +impl SyncPlexer { + pub fn new(bearer: Bearer, protocol: u16) -> Self { + Self { + bearer, + protocol, + clock: Instant::now(), + } + } + + pub fn unwrap(self) -> Bearer { + self.bearer + } +} + +pub type SyncChannel = ChannelBuffer; + +impl agents::Channel for SyncPlexer { + fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> { + let segment = Segment::new(self.clock, self.protocol, payload); + + self.bearer + .write_segment(segment) + .map_err(|_| agents::ChannelError::NotConnected(None)) + } + + fn dequeue_chunk(&mut self) -> Result { + match self.bearer.read_segment() { + Ok(segment) => match segment { + Some(x) => { + assert_eq!( + x.protocol, self.protocol, + "sync plexer received payload for wrong protocol" + ); + Ok(x.payload) + } + None => Err(agents::ChannelError::NotConnected(None)), + }, + Err(_) => Err(agents::ChannelError::NotConnected(None)), + } + } +}