feat: Improve multiplexer ergonomics (#111)
This commit is contained in:
parent
59a3ac3b49
commit
f67e010c76
18 changed files with 242 additions and 261 deletions
|
|
@ -1,17 +1,13 @@
|
|||
use net2::TcpStreamExt;
|
||||
|
||||
use pallas::network::{
|
||||
miniprotocols::{
|
||||
handshake::{n2n::VersionTable, Initiator},
|
||||
run_agent, Point, MAINNET_MAGIC,
|
||||
},
|
||||
multiplexer::{spawn_demuxer, spawn_muxer, use_channel, StdPlexer},
|
||||
multiplexer::{bearers::Bearer, StdPlexer},
|
||||
};
|
||||
|
||||
use pallas::network::miniprotocols::blockfetch::{BatchClient, Observer};
|
||||
|
||||
use std::net::TcpStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BlockPrinter;
|
||||
|
||||
|
|
@ -26,16 +22,14 @@ impl Observer for BlockPrinter {
|
|||
fn main() {
|
||||
env_logger::init();
|
||||
|
||||
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
|
||||
bearer.set_nodelay(true).unwrap();
|
||||
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
|
||||
let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
|
||||
|
||||
let mut plexer = StdPlexer::new(bearer);
|
||||
let mut channel0 = use_channel(&mut plexer, 0);
|
||||
let mut channel3 = use_channel(&mut plexer, 3);
|
||||
let mut channel0 = plexer.use_channel(0);
|
||||
let mut channel3 = plexer.use_channel(3);
|
||||
|
||||
spawn_muxer(plexer.muxer);
|
||||
spawn_demuxer(plexer.demuxer);
|
||||
plexer.muxer.spawn();
|
||||
plexer.demuxer.spawn();
|
||||
|
||||
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
|
||||
let _last = run_agent(Initiator::initial(versions), &mut channel0).unwrap();
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
use pallas::network::{
|
||||
miniprotocols::{chainsync, handshake, localstate, run_agent, Point, MAINNET_MAGIC},
|
||||
multiplexer,
|
||||
multiplexer::{self, bearers::Bearer},
|
||||
};
|
||||
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LoggingObserver;
|
||||
|
||||
|
|
@ -86,17 +84,17 @@ fn main() {
|
|||
|
||||
// we connect to the unix socket of the local node. Make sure you have the right
|
||||
// path for your environment
|
||||
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
|
||||
let bearer = Bearer::connect_unix("/tmp/node.socket").unwrap();
|
||||
|
||||
// setup the multiplexer by specifying the bearer and the IDs of the
|
||||
// miniprotocols to use
|
||||
let mut plexer = multiplexer::StdPlexer::new(bearer);
|
||||
let channel0 = multiplexer::use_channel(&mut plexer, 0);
|
||||
let channel7 = multiplexer::use_channel(&mut plexer, 7);
|
||||
let channel5 = multiplexer::use_channel(&mut plexer, 5);
|
||||
let channel0 = plexer.use_channel(0);
|
||||
let channel7 = plexer.use_channel(7);
|
||||
let channel5 = plexer.use_channel(5);
|
||||
|
||||
multiplexer::spawn_muxer(plexer.muxer);
|
||||
multiplexer::spawn_demuxer(plexer.demuxer);
|
||||
plexer.muxer.spawn();
|
||||
plexer.demuxer.spawn();
|
||||
|
||||
// execute the required handshake against the relay
|
||||
do_handshake(channel0);
|
||||
|
|
|
|||
|
|
@ -1,12 +1,8 @@
|
|||
use net2::TcpStreamExt;
|
||||
|
||||
use pallas::network::{
|
||||
miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC},
|
||||
multiplexer::{spawn_demuxer, spawn_muxer, use_channel, StdChannel, StdPlexer},
|
||||
multiplexer::{bearers::Bearer, StdChannel, StdPlexer},
|
||||
};
|
||||
|
||||
use std::net::TcpStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LoggingObserver;
|
||||
|
||||
|
|
@ -105,19 +101,17 @@ fn main() {
|
|||
|
||||
// setup a TCP socket to act as data bearer between our agents and the remote
|
||||
// relay.
|
||||
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
|
||||
bearer.set_nodelay(true).unwrap();
|
||||
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
|
||||
let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
|
||||
|
||||
// setup the multiplexer by specifying the bearer and the IDs of the
|
||||
// miniprotocols to use
|
||||
let mut plexer = StdPlexer::new(bearer);
|
||||
let channel0 = use_channel(&mut plexer, 0);
|
||||
let channel3 = use_channel(&mut plexer, 3);
|
||||
let channel2 = use_channel(&mut plexer, 2);
|
||||
let channel0 = plexer.use_channel(0);
|
||||
let channel3 = plexer.use_channel(3);
|
||||
let channel2 = plexer.use_channel(2);
|
||||
|
||||
spawn_muxer(plexer.muxer);
|
||||
spawn_demuxer(plexer.demuxer);
|
||||
plexer.muxer.spawn();
|
||||
plexer.demuxer.spawn();
|
||||
|
||||
// execute the required handshake against the relay
|
||||
do_handshake(channel0);
|
||||
|
|
|
|||
|
|
@ -18,4 +18,4 @@ pallas-multiplexer = { version = "0.9.0", path = "../pallas-multiplexer/" }
|
|||
log = "0.4.14"
|
||||
hex = "0.4.3"
|
||||
itertools = "0.10.3"
|
||||
net2 = "0.2.37"
|
||||
thiserror = "1.0.31"
|
||||
|
|
|
|||
|
|
@ -250,7 +250,7 @@ where
|
|||
self.on_intersect_found(point, tip)
|
||||
}
|
||||
(State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
|
||||
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg)),
|
||||
(state, msg) => Err(MachineError::invalid_msg::<Self>(state, &msg)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -347,7 +347,7 @@ impl Agent for TipFinder {
|
|||
self.on_intersect_found(tip)
|
||||
}
|
||||
(State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
|
||||
(state, msg) => Err(MachineError::InvalidMsgForState(state.clone(), msg)),
|
||||
(state, msg) => Err(MachineError::invalid_msg::<Self>(state, &msg)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,5 +45,11 @@ impl Deref for BlockContent {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<BlockContent> for Vec<u8> {
|
||||
fn from(other: BlockContent) -> Self {
|
||||
other.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SkippedContent;
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ where
|
|||
(State::Acquiring, Message::Acquired) => self.on_acquired(),
|
||||
(State::Acquiring, Message::Failure(failure)) => self.on_failure(failure),
|
||||
(State::Querying, Message::Result(result)) => self.on_result(result),
|
||||
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg)),
|
||||
(state, msg) => Err(MachineError::invalid_msg::<Self>(state, &msg)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +1,21 @@
|
|||
use pallas_codec::Fragment;
|
||||
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
|
||||
use std::cell::Cell;
|
||||
use std::{cell::Cell, fmt::Debug};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MachineError<A: Agent> {
|
||||
InvalidMsgForState(A::State, A::Message),
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MachineError {
|
||||
#[error("invalid message for state [{0}]: {1}")]
|
||||
InvalidMsgForState(String, String),
|
||||
|
||||
#[error("channel error communicating with multiplexer: {0}")]
|
||||
ChannelError(ChannelError),
|
||||
|
||||
#[error("downstream error while processing business logic {0}")]
|
||||
DownstreamError(Box<dyn std::error::Error>),
|
||||
}
|
||||
|
||||
impl<A: Agent> MachineError<A> {
|
||||
impl MachineError {
|
||||
pub fn channel(err: ChannelError) -> Self {
|
||||
Self::ChannelError(err)
|
||||
}
|
||||
|
|
@ -17,13 +23,17 @@ impl<A: Agent> MachineError<A> {
|
|||
pub fn downstream(err: Box<dyn std::error::Error>) -> Self {
|
||||
Self::DownstreamError(err)
|
||||
}
|
||||
|
||||
pub fn invalid_msg<A: Agent>(state: &A::State, msg: &A::Message) -> Self {
|
||||
Self::InvalidMsgForState(format!("{:?}", state), format!("{:?}", msg))
|
||||
}
|
||||
}
|
||||
|
||||
pub type Transition<A> = Result<A, MachineError<A>>;
|
||||
pub type Transition<A> = Result<A, MachineError>;
|
||||
|
||||
pub trait Agent: Sized {
|
||||
type Message;
|
||||
type State;
|
||||
type Message: std::fmt::Debug;
|
||||
type State: std::fmt::Debug;
|
||||
|
||||
fn state(&self) -> &Self::State;
|
||||
fn is_done(&self) -> bool;
|
||||
|
|
@ -56,14 +66,14 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn start(&mut self) -> Result<(), MachineError<A>> {
|
||||
pub fn start(&mut self) -> Result<(), MachineError> {
|
||||
let prev = self.agent.take().unwrap();
|
||||
let next = prev.apply_start()?;
|
||||
self.agent.set(Some(next));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run_step(&mut self) -> Result<bool, MachineError<A>> {
|
||||
pub fn run_step(&mut self) -> Result<bool, MachineError> {
|
||||
let prev = self.agent.take().unwrap();
|
||||
let next = run_agent_step(prev, &mut self.buffer)?;
|
||||
let is_done = next.is_done();
|
||||
|
|
@ -73,7 +83,7 @@ where
|
|||
Ok(is_done)
|
||||
}
|
||||
|
||||
pub fn fulfill(mut self) -> Result<(), MachineError<A>> {
|
||||
pub fn fulfill(mut self) -> Result<(), MachineError> {
|
||||
self.start()?;
|
||||
|
||||
while self.run_step()? {}
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ impl Agent for NaiveProvider {
|
|||
..self
|
||||
}),
|
||||
(State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids),
|
||||
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg)),
|
||||
(state, msg) => Err(MachineError::invalid_msg::<Self>(state, &msg)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,10 +18,11 @@ log = "0.4.14"
|
|||
byteorder = "1.4.3"
|
||||
hex = "0.4.3"
|
||||
rand = "0.8.4"
|
||||
thiserror = "1.0.31"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.9.0"
|
||||
|
||||
[features]
|
||||
std = []
|
||||
default = ["std"]
|
||||
default = ["std"]
|
||||
|
|
|
|||
|
|
@ -1,13 +1,18 @@
|
|||
//! Interface to interact with the multiplexer as an agent
|
||||
|
||||
use crate::Payload;
|
||||
|
||||
use pallas_codec::{minicbor, Fragment};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ChannelError {
|
||||
#[error("channel is not connected, failed to send payload")]
|
||||
NotConnected(Option<Payload>),
|
||||
|
||||
#[error("failure encoding message into CBOR")]
|
||||
Encoding(String),
|
||||
|
||||
#[error("failure decoding message from CBOR")]
|
||||
Decoding(String),
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,28 +1,19 @@
|
|||
use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt};
|
||||
use log::{debug, log_enabled, trace};
|
||||
use std::io::{Read, Write};
|
||||
#[cfg(target_family = "unix")]
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::net::{TcpListener, ToSocketAddrs};
|
||||
use std::{net::TcpStream, time::Instant};
|
||||
|
||||
use crate::Payload;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
pub struct Segment {
|
||||
pub protocol: u16,
|
||||
pub timestamp: u32,
|
||||
pub payload: Payload,
|
||||
}
|
||||
|
||||
pub trait Bearer: Read + Write + Send + Sync + Sized {
|
||||
type Error: std::error::Error;
|
||||
|
||||
fn read_segment(&mut self) -> Result<Option<Segment>, Self::Error>;
|
||||
|
||||
fn write_segment(&mut self, segment: Segment) -> Result<(), Self::Error>;
|
||||
|
||||
fn clone(&self) -> Self;
|
||||
}
|
||||
|
||||
impl Segment {
|
||||
pub fn new(clock: Instant, protocol: u16, payload: Payload) -> Self {
|
||||
Segment {
|
||||
|
|
@ -104,35 +95,74 @@ fn read_segment_with_timeout(reader: &mut impl Read) -> Result<Option<Segment>,
|
|||
}
|
||||
}
|
||||
|
||||
impl Bearer for TcpStream {
|
||||
type Error = std::io::Error;
|
||||
pub enum Bearer {
|
||||
Tcp(TcpStream),
|
||||
|
||||
fn clone(&self) -> Self {
|
||||
self.try_clone().expect("error cloning tcp stream")
|
||||
#[cfg(target_family = "unix")]
|
||||
Unix(UnixStream),
|
||||
}
|
||||
|
||||
impl Bearer {
|
||||
pub fn connect_tcp<A: ToSocketAddrs>(addr: A) -> Result<Self, std::io::Error> {
|
||||
let bearer = TcpStream::connect(addr)?;
|
||||
bearer.set_nodelay(true)?;
|
||||
|
||||
Ok(Bearer::Tcp(bearer))
|
||||
}
|
||||
|
||||
fn read_segment(&mut self) -> Result<Option<Segment>, std::io::Error> {
|
||||
read_segment_with_timeout(self)
|
||||
pub fn accept_tcp(server: TcpListener) -> Result<Self, std::io::Error> {
|
||||
let (bearer, _) = server.accept().unwrap();
|
||||
bearer.set_nodelay(true)?;
|
||||
|
||||
Ok(Bearer::Tcp(bearer))
|
||||
}
|
||||
|
||||
fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> {
|
||||
write_segment(self, segment)
|
||||
#[cfg(target_family = "unix")]
|
||||
pub fn connect_unix<P: AsRef<std::path::Path>>(path: P) -> Result<Self, std::io::Error> {
|
||||
let bearer = UnixStream::connect(path)?;
|
||||
|
||||
Ok(Bearer::Unix(bearer))
|
||||
}
|
||||
|
||||
pub fn read_segment(&mut self) -> Result<Option<Segment>, std::io::Error> {
|
||||
match self {
|
||||
Bearer::Tcp(s) => read_segment_with_timeout(s),
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
Bearer::Unix(s) => read_segment_with_timeout(s),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> {
|
||||
match self {
|
||||
Bearer::Tcp(s) => write_segment(s, segment),
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
Bearer::Unix(s) => write_segment(s, segment),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TcpStream> for Bearer {
|
||||
fn from(stream: TcpStream) -> Self {
|
||||
Bearer::Tcp(stream)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
impl Bearer for UnixStream {
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn clone(&self) -> Self {
|
||||
self.try_clone().expect("error cloning unix stream")
|
||||
}
|
||||
|
||||
fn read_segment(&mut self) -> Result<Option<Segment>, std::io::Error> {
|
||||
read_segment_with_timeout(self)
|
||||
}
|
||||
|
||||
fn write_segment(&mut self, segment: Segment) -> Result<(), std::io::Error> {
|
||||
write_segment(self, segment)
|
||||
impl From<UnixStream> for Bearer {
|
||||
fn from(stream: UnixStream) -> Self {
|
||||
Bearer::Unix(stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Bearer {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Bearer::Tcp(s) => Bearer::Tcp(s.try_clone().expect("error cloning tcp stream")),
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
Bearer::Unix(s) => Bearer::Unix(s.try_clone().expect("error cloning unix stream")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use crate::{bearers::Bearer, std::Cancel, Payload};
|
||||
use crate::{bearers::Bearer, Payload};
|
||||
|
||||
pub struct EgressError(pub Payload);
|
||||
|
||||
|
|
@ -8,8 +8,8 @@ pub trait Egress {
|
|||
fn send(&self, payload: Payload) -> Result<(), EgressError>;
|
||||
}
|
||||
|
||||
pub enum DemuxError<B: Bearer> {
|
||||
BearerError(B::Error),
|
||||
pub enum DemuxError {
|
||||
BearerError(std::io::Error),
|
||||
EgressDisconnected(u16, Payload),
|
||||
EgressUnknown(u16, Payload),
|
||||
}
|
||||
|
|
@ -20,17 +20,16 @@ pub enum TickOutcome {
|
|||
}
|
||||
|
||||
/// A demuxer that reads from a bearer into the corresponding egress
|
||||
pub struct Demuxer<B, E> {
|
||||
bearer: B,
|
||||
pub struct Demuxer<E> {
|
||||
bearer: Bearer,
|
||||
egress: HashMap<u16, E>,
|
||||
}
|
||||
|
||||
impl<B, E> Demuxer<B, E>
|
||||
impl<E> Demuxer<E>
|
||||
where
|
||||
B: Bearer,
|
||||
E: Egress,
|
||||
{
|
||||
pub fn new(bearer: B) -> Self {
|
||||
pub fn new(bearer: Bearer) -> Self {
|
||||
Demuxer {
|
||||
bearer,
|
||||
egress: Default::default(),
|
||||
|
|
@ -41,7 +40,7 @@ where
|
|||
self.egress.insert(id, tx);
|
||||
}
|
||||
|
||||
fn dispatch(&self, protocol: u16, payload: Payload) -> Result<(), DemuxError<B>> {
|
||||
fn dispatch(&self, protocol: u16, payload: Payload) -> Result<(), DemuxError> {
|
||||
match self.egress.get(&protocol) {
|
||||
Some(tx) => match tx.send(payload) {
|
||||
Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)),
|
||||
|
|
@ -51,7 +50,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn tick(&mut self) -> Result<TickOutcome, DemuxError<B>> {
|
||||
pub fn tick(&mut self) -> Result<TickOutcome, DemuxError> {
|
||||
match self.bearer.read_segment() {
|
||||
Err(err) => Err(DemuxError::BearerError(err)),
|
||||
Ok(None) => Ok(TickOutcome::Idle),
|
||||
|
|
@ -61,23 +60,4 @@ where
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block(&mut self, cancel: Cancel) -> Result<(), B::Error> {
|
||||
loop {
|
||||
match self.tick() {
|
||||
Ok(TickOutcome::Busy) => (),
|
||||
Ok(TickOutcome::Idle) => match cancel.is_set() {
|
||||
true => break Ok(()),
|
||||
false => (),
|
||||
},
|
||||
Err(DemuxError::BearerError(err)) => return Err(err),
|
||||
Err(DemuxError::EgressDisconnected(id, _)) => {
|
||||
log::warn!("disconnected protocol {}", id)
|
||||
}
|
||||
Err(DemuxError::EgressUnknown(id, _)) => {
|
||||
log::warn!("unknown protocol {}", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,28 +3,34 @@ pub mod bearers;
|
|||
pub mod demux;
|
||||
pub mod mux;
|
||||
|
||||
use bearers::Bearer;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod std;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub use crate::std::*;
|
||||
|
||||
pub type Payload = Vec<u8>;
|
||||
|
||||
pub struct Multiplexer<B, I, E>
|
||||
pub struct Multiplexer<I, E>
|
||||
where
|
||||
B: bearers::Bearer,
|
||||
I: mux::Ingress,
|
||||
E: demux::Egress,
|
||||
{
|
||||
pub muxer: mux::Muxer<B, I>,
|
||||
pub demuxer: demux::Demuxer<B, E>,
|
||||
pub muxer: mux::Muxer<I>,
|
||||
pub demuxer: demux::Demuxer<E>,
|
||||
}
|
||||
|
||||
impl<B, I, E> Multiplexer<B, I, E>
|
||||
impl<I, E> Multiplexer<I, E>
|
||||
where
|
||||
B: bearers::Bearer,
|
||||
I: mux::Ingress,
|
||||
E: demux::Egress,
|
||||
{
|
||||
pub fn new(bearer: B) -> Self {
|
||||
pub fn new(bearer: Bearer) -> Self {
|
||||
Multiplexer {
|
||||
muxer: mux::Muxer::new(bearer.clone()),
|
||||
demuxer: demux::Demuxer::new(bearer.clone()),
|
||||
demuxer: demux::Demuxer::new(bearer),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -33,9 +39,3 @@ where
|
|||
self.demuxer.register(protocol, egress);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod std;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub use crate::std::*;
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ use rand::thread_rng;
|
|||
|
||||
use crate::{
|
||||
bearers::{Bearer, Segment},
|
||||
std::Cancel,
|
||||
Payload,
|
||||
};
|
||||
|
||||
|
|
@ -24,27 +23,23 @@ pub trait Ingress {
|
|||
|
||||
type Message = (u16, Payload);
|
||||
|
||||
pub enum TickOutcome<TBearer>
|
||||
where
|
||||
TBearer: Bearer,
|
||||
{
|
||||
BearerError(TBearer::Error),
|
||||
pub enum TickOutcome {
|
||||
BearerError(std::io::Error),
|
||||
Idle,
|
||||
Busy,
|
||||
}
|
||||
|
||||
pub struct Muxer<B, I> {
|
||||
bearer: B,
|
||||
pub struct Muxer<I> {
|
||||
bearer: Bearer,
|
||||
ingress: HashMap<u16, I>,
|
||||
clock: Instant,
|
||||
}
|
||||
|
||||
impl<B, I> Muxer<B, I>
|
||||
impl<I> Muxer<I>
|
||||
where
|
||||
B: Bearer,
|
||||
I: Ingress,
|
||||
{
|
||||
pub fn new(bearer: B) -> Self {
|
||||
pub fn new(bearer: Bearer) -> Self {
|
||||
Self {
|
||||
bearer,
|
||||
ingress: Default::default(),
|
||||
|
|
@ -93,7 +88,7 @@ where
|
|||
None
|
||||
}
|
||||
|
||||
pub fn tick(&mut self) -> TickOutcome<B> {
|
||||
pub fn tick(&mut self) -> TickOutcome {
|
||||
match self.select() {
|
||||
Some((id, payload)) => {
|
||||
let segment = Segment::new(self.clock, id, payload);
|
||||
|
|
@ -106,17 +101,4 @@ where
|
|||
None => TickOutcome::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block(&mut self, cancel: Cancel) -> Result<(), B::Error> {
|
||||
loop {
|
||||
match self.tick() {
|
||||
TickOutcome::BearerError(err) => return Err(err),
|
||||
TickOutcome::Idle => match cancel.is_set() {
|
||||
true => break Ok(()),
|
||||
false => std::thread::yield_now(),
|
||||
},
|
||||
TickOutcome::Busy => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use crate::{agents, bearers::Bearer, demux, mux, Payload};
|
||||
use crate::{agents, demux, mux, Payload};
|
||||
|
||||
use std::{
|
||||
sync::{
|
||||
|
|
@ -7,6 +7,7 @@ use std::{
|
|||
Arc,
|
||||
},
|
||||
thread::{spawn, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
pub type StdIngress = Receiver<Payload>;
|
||||
|
|
@ -32,7 +33,73 @@ impl demux::Egress for StdEgress {
|
|||
}
|
||||
}
|
||||
|
||||
pub type StdPlexer<B> = crate::Multiplexer<B, StdIngress, StdEgress>;
|
||||
pub type StdPlexer = crate::Multiplexer<StdIngress, StdEgress>;
|
||||
|
||||
impl StdPlexer {
|
||||
pub fn use_channel(&mut self, protocol: u16) -> StdChannel {
|
||||
let (demux_tx, demux_rx) = channel::<Payload>();
|
||||
let (mux_tx, mux_rx) = channel::<Payload>();
|
||||
|
||||
self.register_channel(protocol, mux_rx, demux_tx);
|
||||
|
||||
(mux_tx, demux_rx)
|
||||
}
|
||||
}
|
||||
|
||||
impl mux::Muxer<StdIngress> {
|
||||
pub fn block(&mut self, cancel: Cancel) -> Result<(), std::io::Error> {
|
||||
loop {
|
||||
match self.tick() {
|
||||
mux::TickOutcome::BearerError(err) => return Err(err),
|
||||
mux::TickOutcome::Idle => match cancel.is_set() {
|
||||
true => break Ok(()),
|
||||
false => {
|
||||
// TODO: investigate why std::thread::yield_now() hogs the thread
|
||||
std::thread::sleep(Duration::from_millis(100))
|
||||
}
|
||||
},
|
||||
mux::TickOutcome::Busy => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn(mut self) -> Loop {
|
||||
let cancel = Cancel::default();
|
||||
let cancel2 = cancel.clone();
|
||||
let thread = spawn(move || self.block(cancel2));
|
||||
|
||||
Loop { cancel, thread }
|
||||
}
|
||||
}
|
||||
|
||||
impl demux::Demuxer<StdEgress> {
|
||||
pub fn block(&mut self, cancel: Cancel) -> Result<(), std::io::Error> {
|
||||
loop {
|
||||
match self.tick() {
|
||||
Ok(demux::TickOutcome::Busy) => (),
|
||||
Ok(demux::TickOutcome::Idle) => match cancel.is_set() {
|
||||
true => break Ok(()),
|
||||
false => (),
|
||||
},
|
||||
Err(demux::DemuxError::BearerError(err)) => return Err(err),
|
||||
Err(demux::DemuxError::EgressDisconnected(id, _)) => {
|
||||
log::warn!("disconnected protocol {}", id)
|
||||
}
|
||||
Err(demux::DemuxError::EgressUnknown(id, _)) => {
|
||||
log::warn!("unknown protocol {}", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn(mut self) -> Loop {
|
||||
let cancel = Cancel::default();
|
||||
let cancel2 = cancel.clone();
|
||||
let thread = spawn(move || self.block(cancel2));
|
||||
|
||||
Loop { cancel, thread }
|
||||
}
|
||||
}
|
||||
|
||||
pub type StdChannel = (Sender<Payload>, Receiver<Payload>);
|
||||
|
||||
|
|
@ -52,15 +119,6 @@ impl agents::Channel for StdChannel {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn use_channel<B: Bearer>(plexer: &mut StdPlexer<B>, protocol: u16) -> StdChannel {
|
||||
let (demux_tx, demux_rx) = channel::<Payload>();
|
||||
let (mux_tx, mux_rx) = channel::<Payload>();
|
||||
|
||||
plexer.register_channel(protocol, mux_rx, demux_tx);
|
||||
|
||||
(mux_tx, demux_rx)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Cancel(Arc<AtomicBool>);
|
||||
|
||||
|
|
@ -75,49 +133,17 @@ impl Cancel {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Loop<B>
|
||||
where
|
||||
B: Bearer,
|
||||
{
|
||||
pub struct Loop {
|
||||
cancel: Cancel,
|
||||
thread: JoinHandle<Result<(), B::Error>>,
|
||||
thread: JoinHandle<Result<(), std::io::Error>>,
|
||||
}
|
||||
|
||||
impl<B> Loop<B>
|
||||
where
|
||||
B: Bearer,
|
||||
{
|
||||
impl Loop {
|
||||
pub fn cancel(&self) {
|
||||
self.cancel.set();
|
||||
}
|
||||
|
||||
pub fn join(self) -> Result<(), B::Error> {
|
||||
pub fn join(self) -> Result<(), std::io::Error> {
|
||||
self.thread.join().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_muxer<B, I>(mut muxer: mux::Muxer<B, I>) -> Loop<B>
|
||||
where
|
||||
B: Bearer + 'static,
|
||||
B::Error: Send,
|
||||
I: mux::Ingress + Send + 'static,
|
||||
{
|
||||
let cancel = Cancel::default();
|
||||
let cancel2 = cancel.clone();
|
||||
let thread = spawn(move || muxer.block(cancel2));
|
||||
|
||||
Loop { cancel, thread }
|
||||
}
|
||||
|
||||
pub fn spawn_demuxer<B, E>(mut demuxer: demux::Demuxer<B, E>) -> Loop<B>
|
||||
where
|
||||
B: Bearer + 'static,
|
||||
B::Error: Send,
|
||||
E: demux::Egress + Send + 'static,
|
||||
{
|
||||
let cancel = Cancel::default();
|
||||
let cancel2 = cancel.clone();
|
||||
let thread = spawn(move || demuxer.block(cancel2));
|
||||
|
||||
Loop { cancel, thread }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,36 +1,32 @@
|
|||
use std::{
|
||||
net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream},
|
||||
net::{Ipv4Addr, SocketAddrV4, TcpListener},
|
||||
thread::{self, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use log::info;
|
||||
use pallas_codec::minicbor;
|
||||
use pallas_multiplexer::{
|
||||
agents::{Channel, ChannelBuffer},
|
||||
spawn_demuxer, spawn_muxer, use_channel, StdPlexer,
|
||||
bearers::Bearer,
|
||||
StdPlexer,
|
||||
};
|
||||
use rand::{distributions::Uniform, Rng};
|
||||
|
||||
fn setup_passive_muxer<const P: u16>() -> JoinHandle<StdPlexer<TcpStream>> {
|
||||
fn setup_passive_muxer<const P: u16>() -> JoinHandle<StdPlexer> {
|
||||
thread::spawn(|| {
|
||||
let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap();
|
||||
info!("listening for connections on port {}", P);
|
||||
let (bearer, _) = server.accept().unwrap();
|
||||
|
||||
bearer.set_nonblocking(true).unwrap();
|
||||
|
||||
bearer
|
||||
.set_read_timeout(Some(Duration::from_secs(3)))
|
||||
.unwrap();
|
||||
let bearer = Bearer::accept_tcp(server).unwrap();
|
||||
|
||||
StdPlexer::new(bearer)
|
||||
})
|
||||
}
|
||||
|
||||
fn setup_active_muxer<const P: u16>() -> JoinHandle<StdPlexer<TcpStream>> {
|
||||
fn setup_active_muxer<const P: u16>() -> JoinHandle<StdPlexer> {
|
||||
thread::spawn(|| {
|
||||
let bearer = TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap();
|
||||
let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap();
|
||||
|
||||
StdPlexer::new(bearer)
|
||||
})
|
||||
}
|
||||
|
|
@ -53,11 +49,11 @@ fn one_way_small_sequence_of_payloads() {
|
|||
let mut active_plexer = active.join().unwrap();
|
||||
let mut passive_plexer = passive.join().unwrap();
|
||||
|
||||
let mut sender_channel = use_channel(&mut active_plexer, 0x0003u16);
|
||||
let mut receiver_channel = use_channel(&mut passive_plexer, 0x8003u16);
|
||||
let mut sender_channel = active_plexer.use_channel(0x0003u16);
|
||||
let mut receiver_channel = passive_plexer.use_channel(0x8003u16);
|
||||
|
||||
let loop1 = spawn_muxer(active_plexer.muxer);
|
||||
let loop2 = spawn_demuxer(passive_plexer.demuxer);
|
||||
active_plexer.muxer.spawn();
|
||||
passive_plexer.demuxer.spawn();
|
||||
|
||||
for _ in [0..100] {
|
||||
let payload = random_payload(50);
|
||||
|
|
@ -65,47 +61,6 @@ fn one_way_small_sequence_of_payloads() {
|
|||
let received_payload = receiver_channel.dequeue_chunk().unwrap();
|
||||
assert_eq!(payload, received_payload);
|
||||
}
|
||||
|
||||
loop1.cancel();
|
||||
loop1.join().unwrap();
|
||||
|
||||
loop2.cancel();
|
||||
loop2.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn threads_cancel_while_still_sending() {
|
||||
let passive = setup_passive_muxer::<50401>();
|
||||
|
||||
// HACK: a small sleep seems to be required for Github actions runner to
|
||||
// formally expose the port
|
||||
thread::sleep(std::time::Duration::from_secs(1));
|
||||
|
||||
let active = setup_active_muxer::<50401>();
|
||||
|
||||
let mut active_plexer = active.join().unwrap();
|
||||
let mut passive_plexer = passive.join().unwrap();
|
||||
|
||||
let mut sender_channel = use_channel(&mut active_plexer, 0x0003u16);
|
||||
let mut receiver_channel = use_channel(&mut passive_plexer, 0x8003u16);
|
||||
|
||||
let loop1 = spawn_muxer(active_plexer.muxer);
|
||||
let loop2 = spawn_demuxer(passive_plexer.demuxer);
|
||||
|
||||
thread::spawn(move || loop {
|
||||
let payload = random_payload(50);
|
||||
sender_channel.enqueue_chunk(payload.clone()).unwrap();
|
||||
let received_payload = receiver_channel.dequeue_chunk().unwrap();
|
||||
assert_eq!(payload, received_payload);
|
||||
});
|
||||
|
||||
thread::sleep(Duration::from_secs(5));
|
||||
|
||||
loop1.cancel();
|
||||
loop1.join().unwrap();
|
||||
|
||||
loop2.cancel();
|
||||
loop2.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -130,7 +85,7 @@ fn multiple_messages_in_same_payload() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn fragmented_message_in_multiple_payload() {
|
||||
fn fragmented_message_in_multiple_payloads() {
|
||||
let mut input = Vec::new();
|
||||
let msg = (11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8);
|
||||
minicbor::encode(msg, &mut input).unwrap();
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ impl TransactionOutput {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::alonzo::{crypto, BlockWrapper, TransactionBodyComponent};
|
||||
use crate::alonzo::{BlockWrapper, TransactionBodyComponent};
|
||||
use crate::Fragment;
|
||||
|
||||
const KNOWN_ADDRESSES: &[&str] =&[
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue