Improve multiplexer lib semantics

This commit is contained in:
Santiago Carmuega 2021-11-22 14:31:54 -03:00
parent 1bd7797d78
commit bf8581e361
11 changed files with 76 additions and 51 deletions

View file

@ -11,15 +11,14 @@ fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer =
TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0, 3]).unwrap();
let (_, rx, tx) = handles.remove(0);
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 3]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
println!("{:?}", last);
@ -37,11 +36,8 @@ fn main() {
),
);
let (_, bf_rx, bf_tx) = handles.remove(0);
let (bf_rx, bf_tx) = muxer.use_channel(3);
let bf = BlockFetchClient::initial(range);
let bf_last = run_agent(bf, bf_rx, &bf_tx);
println!("{:?}", bf_last);
}

View file

@ -19,14 +19,14 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0, 4, 5]).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4, 5]).unwrap();
let (_, rx, tx) = handles.remove(0);
let (rx, tx) = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
println!("last hanshake state: {:?}", last);
let (_, ts_rx, ts_tx) = handles.remove(0);
let (ts_rx, ts_tx) = muxer.use_channel(4);
let ts = NaiveProvider::initial(vec![]);
let ts = run_agent(ts, ts_rx, &ts_tx).unwrap();
println!("last tx-submission state: {:?}", ts);
@ -36,7 +36,7 @@ fn main() {
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let (_, cs_rx, cs_tx) = handles.remove(0);
let (cs_rx, cs_tx) = muxer.use_channel(5);
let cs = Consumer::initial(known_points);
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
println!("{:?}", cs);

View file

@ -16,8 +16,8 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0, 2]).unwrap();
let (_, rx, tx) = handles.remove(0);
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 2]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
@ -28,7 +28,7 @@ fn main() {
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let (_, cs_rx, cs_tx) = handles.remove(0);
let (cs_rx, cs_tx) = muxer.use_channel(2);
let cs = Consumer::initial(known_points);
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();

View file

@ -1,7 +1,7 @@
use std::fmt::Debug;
use log::{debug, log_enabled, trace, warn};
use minicbor::encode;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
PayloadEncoder, Transition,

View file

@ -10,15 +10,14 @@ fn main() {
env_logger::init();
//let bearer = TcpStream::connect("localhost:6000").unwrap();
let bearer =
TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap();
let (_, rx, tx) = handles.remove(0);
let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();

View file

@ -16,8 +16,8 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap();
let (_, rx, tx) = handles.remove(0);
let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();

View file

@ -1,7 +1,6 @@
use core::panic;
use std::collections::HashMap;
use itertools::Merge;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
PayloadEncoder,

View file

@ -2,21 +2,25 @@ use std::{net::TcpListener, thread, time::Duration};
use pallas_multiplexer::Multiplexer;
const PROTOCOLS: [u16; 2] = [0x8002u16, 0x8003u16];
fn main() {
env_logger::init();
let server = TcpListener::bind("0.0.0.0:3001").unwrap();
let (bearer, _) = server.accept().unwrap();
let handles =
Multiplexer::new(bearer, &vec![0x8002u16, 0x8003u16][..]).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap();
for handle in handles {
for protocol in PROTOCOLS {
let handle = muxer.use_channel(protocol);
thread::spawn(move || {
let (id, rx, tx) = handle;
let (rx, _tx) = handle;
loop {
let payload = rx.recv().unwrap();
println!("id:{}, length:{}", id, payload.len());
println!("id:{}, length:{}", protocol, payload.len());
}
});
}

View file

@ -2,22 +2,25 @@ use std::{net::TcpStream, thread, time::Duration};
use pallas_multiplexer::Multiplexer;
const PROTOCOLS: [u16; 2] = [0x0002u16, 0x0003u16];
fn main() {
env_logger::init();
let bearer = TcpStream::connect("127.0.0.1:3001").unwrap();
let handles =
Multiplexer::new(bearer, &vec![0x0002u16, 0x0003u16][..]).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap();
for protocol in PROTOCOLS {
let handle = muxer.use_channel(protocol);
for (idx, handle) in handles.into_iter().enumerate() {
thread::spawn(move || {
let (id, rx, tx) = handle;
let (_rx, tx) = handle;
loop {
let payload = vec![1; 65545];
tx.send(payload).unwrap();
thread::sleep(Duration::from_millis(
50u64 + (idx as u64 * 10u64),
50u64 + (protocol as u64 * 10u64),
));
}
});

View file

@ -1,9 +1,10 @@
use std::{
borrow::Borrow,
collections::HashMap,
io::{Read, Write},
net::TcpStream,
sync::mpsc::{self, Receiver, Sender, TryRecvError},
thread,
thread::{self, JoinHandle},
time::{Duration, Instant},
};
@ -52,7 +53,7 @@ impl Bearer for TcpStream {
fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> {
let mut header = [0u8; 8];
self.read_exact(&mut header)?;
if log_enabled!(log::Level::Trace) {
@ -63,7 +64,10 @@ impl Bearer for TcpStream {
let id = NetworkEndian::read_u16(&mut header[4..6]) as usize ^ 0x8000;
let ts = NetworkEndian::read_u32(&mut header[0..4]);
debug!("parsed inbound msg, protocol id: {}, ts: {}, payload length: {}", id, ts, length);
debug!(
"parsed inbound msg, protocol id: {}, ts: {}, payload length: {}",
id, ts, length
);
let mut payload = vec![0u8; length];
self.read_exact(&mut payload)?;
@ -166,19 +170,21 @@ where
}
}
type ChannelProtocolHandle = (u16, Receiver<Payload>, Sender<Payload>);
type ChannelProtocolIO = (Receiver<Payload>, Sender<Payload>);
type ChannelProtocolHandle = (u16, ChannelProtocolIO);
type ChannelIngressHandle = (u16, Receiver<Payload>);
type ChannelEgressHandle = (u16, Sender<Payload>);
type MuxIngress = Vec<ChannelIngressHandle>;
type DemuxerEgress = Vec<ChannelEgressHandle>;
pub struct Multiplexer {}
pub struct Multiplexer {
tx_thread: JoinHandle<()>,
rx_thread: JoinHandle<()>,
io_handles: HashMap<u16, ChannelProtocolIO>,
}
impl Multiplexer {
pub fn new<TBearer>(
bearer: TBearer,
protocols: &[u16],
) -> Result<Vec<ChannelProtocolHandle>, Error>
pub fn try_setup<TBearer>(bearer: TBearer, protocols: &[u16]) -> Result<Multiplexer, Error>
where
TBearer: Bearer + 'static,
{
@ -188,7 +194,8 @@ impl Multiplexer {
let (demux_tx, demux_rx) = mpsc::channel::<Payload>();
let (mux_tx, mux_rx) = mpsc::channel::<Payload>();
let protocol_handle: ChannelProtocolHandle = (*id, demux_rx, mux_tx);
let protocol_io = (demux_rx, mux_tx);
let protocol_handle: ChannelProtocolHandle = (*id, protocol_io);
let ingress_handle: ChannelIngressHandle = (*id, mux_rx);
let egress_handle: ChannelEgressHandle = (*id, demux_tx);
@ -201,11 +208,29 @@ impl Multiplexer {
let (ingress, egress): (Vec<_>, Vec<_>) = multiplex_handles.into_iter().unzip();
let mut tx_bearer = bearer.clone();
thread::spawn(move || tx_loop(&mut tx_bearer, ingress));
let tx_thread = thread::spawn(move || tx_loop(&mut tx_bearer, ingress));
let mut rx_bearer = bearer.clone();
thread::spawn(move || rx_loop(&mut rx_bearer, egress));
let rx_thread = thread::spawn(move || rx_loop(&mut rx_bearer, egress));
Ok(protocol_handles)
let io_handles: HashMap<u16, ChannelProtocolIO> = protocol_handles.into_iter().collect();
Ok(Multiplexer {
io_handles,
tx_thread,
rx_thread,
})
}
pub fn use_channel(&mut self, protocol_id: u16) -> ChannelProtocolIO {
self
.io_handles
.remove(&protocol_id)
.unwrap()
}
pub fn join(self) {
self.tx_thread.join().unwrap();
self.rx_thread.join().unwrap();
}
}

View file

@ -16,16 +16,15 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut handles = Multiplexer::new(bearer, &vec![0, 4]).unwrap();
let (_, rx, tx) = handles.remove(0);
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4]).unwrap();
let (hs_rx, hs_tx) = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), hs_rx, &hs_tx).unwrap();
println!("{:?}", last);
let (_, ts_rx, ts_tx) = handles.remove(0);
let (ts_rx, ts_tx) = muxer.use_channel(4);
let ts = NaiveProvider::initial(vec![]);
let ts = run_agent(ts, ts_rx, &ts_tx).unwrap();