feat(multiplexer): Introduce sync multiplexer option (#210)
This commit is contained in:
parent
323402eb54
commit
03c36f57ee
6 changed files with 72 additions and 5 deletions
|
|
@ -132,6 +132,10 @@ where
|
|||
self.send_propose(versions)?;
|
||||
self.recv_while_confirm()
|
||||
}
|
||||
|
||||
pub fn unwrap(self) -> H {
|
||||
self.1.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub type N2NClient<H> = Client<H, super::n2n::VersionData>;
|
||||
|
|
|
|||
|
|
@ -23,4 +23,5 @@ env_logger = "0.9.0"
|
|||
|
||||
[features]
|
||||
std = []
|
||||
default = ["std"]
|
||||
sync = []
|
||||
default = ["std", "sync"]
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use crate::{bearers::Bearer, Payload};
|
|||
pub struct EgressError(pub Payload);
|
||||
|
||||
pub trait Egress {
|
||||
fn send(&self, payload: Payload) -> Result<(), EgressError>;
|
||||
fn send(&mut self, payload: Payload) -> Result<(), EgressError>;
|
||||
}
|
||||
|
||||
pub enum DemuxError {
|
||||
|
|
@ -40,8 +40,12 @@ where
|
|||
self.egress.insert(id, tx);
|
||||
}
|
||||
|
||||
fn dispatch(&self, protocol: u16, payload: Payload) -> Result<(), DemuxError> {
|
||||
match self.egress.get(&protocol) {
|
||||
pub fn unregister(&mut self, id: u16) -> Option<E> {
|
||||
self.egress.remove(&id)
|
||||
}
|
||||
|
||||
fn dispatch(&mut self, protocol: u16, payload: Payload) -> Result<(), DemuxError> {
|
||||
match self.egress.get_mut(&protocol) {
|
||||
Some(tx) => match tx.send(payload) {
|
||||
Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)),
|
||||
Ok(_) => Ok(()),
|
||||
|
|
|
|||
|
|
@ -6,6 +6,9 @@ pub mod mux;
|
|||
#[cfg(feature = "std")]
|
||||
mod std;
|
||||
|
||||
#[cfg(feature = "sync")]
|
||||
pub mod sync;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub use crate::std::*;
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ impl mux::Ingress for StdIngress {
|
|||
pub type StdEgress = Sender<Payload>;
|
||||
|
||||
impl demux::Egress for StdEgress {
|
||||
fn send(&self, payload: Payload) -> Result<(), demux::EgressError> {
|
||||
fn send(&mut self, payload: Payload) -> Result<(), demux::EgressError> {
|
||||
match Sender::send(self, payload) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(SendError(p)) => Err(demux::EgressError(p)),
|
||||
|
|
|
|||
55
pallas-multiplexer/src/sync.rs
Normal file
55
pallas-multiplexer/src/sync.rs
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
use crate::{
|
||||
agents::{self, ChannelBuffer},
|
||||
bearers::{Bearer, Segment},
|
||||
Payload,
|
||||
};
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
pub struct SyncPlexer {
|
||||
bearer: Bearer,
|
||||
protocol: u16,
|
||||
clock: Instant,
|
||||
}
|
||||
|
||||
impl SyncPlexer {
|
||||
pub fn new(bearer: Bearer, protocol: u16) -> Self {
|
||||
Self {
|
||||
bearer,
|
||||
protocol,
|
||||
clock: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unwrap(self) -> Bearer {
|
||||
self.bearer
|
||||
}
|
||||
}
|
||||
|
||||
pub type SyncChannel = ChannelBuffer<SyncPlexer>;
|
||||
|
||||
impl agents::Channel for SyncPlexer {
|
||||
fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> {
|
||||
let segment = Segment::new(self.clock, self.protocol, payload);
|
||||
|
||||
self.bearer
|
||||
.write_segment(segment)
|
||||
.map_err(|_| agents::ChannelError::NotConnected(None))
|
||||
}
|
||||
|
||||
fn dequeue_chunk(&mut self) -> Result<Payload, agents::ChannelError> {
|
||||
match self.bearer.read_segment() {
|
||||
Ok(segment) => match segment {
|
||||
Some(x) => {
|
||||
assert_eq!(
|
||||
x.protocol, self.protocol,
|
||||
"sync plexer received payload for wrong protocol"
|
||||
);
|
||||
Ok(x.payload)
|
||||
}
|
||||
None => Err(agents::ChannelError::NotConnected(None)),
|
||||
},
|
||||
Err(_) => Err(agents::ChannelError::NotConnected(None)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue