From 0f4f98dd3d6e2cea4b8d955cc0a7bf7a014ae46f Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 28 Nov 2021 17:03:38 -0300 Subject: [PATCH] Tidy up multiplexer api --- pallas-blockfetch/examples/client.rs | 10 +++++----- pallas-chainsync/examples/blocks.rs | 10 +++++----- pallas-chainsync/examples/headers.rs | 11 +++++------ pallas-handshake/examples/client.rs | 6 +++--- pallas-handshake/examples/node.rs | 6 +++--- pallas-localstate/examples/chainpoint.rs | 10 +++++----- pallas-machines/src/lib.rs | 9 +++++---- pallas-multiplexer/examples/listener.rs | 6 +++--- pallas-multiplexer/examples/sender.rs | 6 +++--- pallas-multiplexer/src/lib.rs | 18 ++++++++++-------- pallas-txsubmission/examples/naive.rs | 10 +++++----- 11 files changed, 52 insertions(+), 50 deletions(-) diff --git a/pallas-blockfetch/examples/client.rs b/pallas-blockfetch/examples/client.rs index 8eb670d..4ccd857 100644 --- a/pallas-blockfetch/examples/client.rs +++ b/pallas-blockfetch/examples/client.rs @@ -16,11 +16,11 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 3]).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &vec![0, 3]).unwrap(); - let (rx, tx) = muxer.use_channel(0); + let hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_channel).unwrap(); println!("{:?}", last); let range = ( @@ -36,8 +36,8 @@ fn main() { ), ); - let (bf_rx, bf_tx) = muxer.use_channel(3); + let bf_channel = muxer.use_channel(3); let bf = BlockFetchClient::initial(range); - let bf_last = run_agent(bf, bf_rx, &bf_tx); + let bf_last = run_agent(bf, bf_channel); println!("{:?}", bf_last); } diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs index 5d3251d..bc49478 100644 --- a/pallas-chainsync/examples/blocks.rs +++ b/pallas-chainsync/examples/blocks.rs @@ -12,11 +12,11 @@ fn main() { // path for your environment let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4, 5]).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &vec![0, 4, 5]).unwrap(); - let (rx, tx) = muxer.use_channel(0); + let hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_channel).unwrap(); println!("last hanshake state: {:?}", last); // some random known-point in the chain to use as starting point for the sync @@ -25,8 +25,8 @@ fn main() { hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), )]; - let (cs_rx, cs_tx) = muxer.use_channel(5); + let cs_channel = muxer.use_channel(5); let cs = ClientConsumer::initial(known_points, NoopStorage { }); - let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); + let cs = run_agent(cs, cs_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs index 683fea0..e977748 100644 --- a/pallas-chainsync/examples/headers.rs +++ b/pallas-chainsync/examples/headers.rs @@ -11,15 +11,14 @@ fn main() { env_logger::init(); let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 2]).unwrap(); - let (rx, tx) = muxer.use_channel(0); + let mut muxer = Multiplexer::setup(bearer, &vec![0, 2]).unwrap(); + let hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_channel).unwrap(); println!("{:?}", last); let known_points = vec![Point( @@ -27,10 +26,10 @@ fn main() { hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), )]; - let (cs_rx, cs_tx) = muxer.use_channel(2); + let cs_channel = muxer.use_channel(2); let cs = ClientConsumer::initial(known_points, NoopStorage {}); - let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); + let cs = run_agent(cs, cs_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-handshake/examples/client.rs b/pallas-handshake/examples/client.rs index 41c060c..1a2b3ab 100644 --- a/pallas-handshake/examples/client.rs +++ b/pallas-handshake/examples/client.rs @@ -15,11 +15,11 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap(); - let (rx, tx) = muxer.use_channel(0); + let hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_channel).unwrap(); println!("{:?}", last); } diff --git a/pallas-handshake/examples/node.rs b/pallas-handshake/examples/node.rs index 3cdc7e7..1182b69 100644 --- a/pallas-handshake/examples/node.rs +++ b/pallas-handshake/examples/node.rs @@ -16,11 +16,11 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap(); - let (rx, tx) = muxer.use_channel(0); + let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap(); + let channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), channel).unwrap(); println!("{:?}", last); } diff --git a/pallas-localstate/examples/chainpoint.rs b/pallas-localstate/examples/chainpoint.rs index b46dbe3..13d3273 100644 --- a/pallas-localstate/examples/chainpoint.rs +++ b/pallas-localstate/examples/chainpoint.rs @@ -85,15 +85,15 @@ fn main() { // path for your environment let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 7]).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &vec![0, 7]).unwrap(); - let (rx, tx) = muxer.use_channel(0); + let hs_channel = muxer.use_channel(0); let versions = VersionTable::only_v10(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_channel).unwrap(); println!("last hanshake state: {:?}", last); - let (cs_rx, cs_tx) = muxer.use_channel(7); + let ls_channel = muxer.use_channel(7); let cs = OneShotClient::::initial(None, Request::GetChainPoint); - let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); + let cs = run_agent(cs, ls_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-machines/src/lib.rs b/pallas-machines/src/lib.rs index 9cd160d..c49796d 100644 --- a/pallas-machines/src/lib.rs +++ b/pallas-machines/src/lib.rs @@ -1,7 +1,7 @@ mod payload; use log::{debug, trace, warn}; -use pallas_multiplexer::Payload; +use pallas_multiplexer::{Channel, Payload}; use std::borrow::Borrow; use std::fmt::{Debug, Display}; use std::sync::mpsc::{Receiver, Sender}; @@ -89,9 +89,10 @@ pub trait Agent: Sized { pub fn run_agent( agent: T, - rx: Receiver, - output: &impl MachineOutput, + channel: Channel, ) -> Result> { + let Channel(tx, rx) = channel; + let mut input = PayloadDeconstructor { rx, remaining: Vec::new(), @@ -104,7 +105,7 @@ pub fn run_agent( match agent.has_agency() { true => { - agent = agent.send_next(output)?; + agent = agent.send_next(&tx)?; } false => { let msg = input.consume_next_message::()?; diff --git a/pallas-multiplexer/examples/listener.rs b/pallas-multiplexer/examples/listener.rs index c144bf8..ecdb3fd 100644 --- a/pallas-multiplexer/examples/listener.rs +++ b/pallas-multiplexer/examples/listener.rs @@ -1,6 +1,6 @@ use std::{net::TcpListener, os::unix::net::UnixListener, thread, time::Duration}; -use pallas_multiplexer::Multiplexer; +use pallas_multiplexer::{Channel, Multiplexer}; const PROTOCOLS: [u16; 2] = [0x8002u16, 0x8003u16]; @@ -11,13 +11,13 @@ fn main() { let server = UnixListener::bind("/tmp/pallas").unwrap(); let (bearer, _) = server.accept().unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &PROTOCOLS).unwrap(); for protocol in PROTOCOLS { let handle = muxer.use_channel(protocol); thread::spawn(move || { - let (rx, _tx) = handle; + let Channel(_, rx) = handle; loop { let payload = rx.recv().unwrap(); diff --git a/pallas-multiplexer/examples/sender.rs b/pallas-multiplexer/examples/sender.rs index c44c4f4..a04d892 100644 --- a/pallas-multiplexer/examples/sender.rs +++ b/pallas-multiplexer/examples/sender.rs @@ -1,6 +1,6 @@ use std::{net::TcpStream, os::unix::net::UnixStream, thread, time::Duration}; -use pallas_multiplexer::Multiplexer; +use pallas_multiplexer::{Channel, Multiplexer}; const PROTOCOLS: [u16; 2] = [0x0002u16, 0x0003u16]; @@ -9,13 +9,13 @@ fn main() { //let bearer = TcpStream::connect("127.0.0.1:3001").unwrap(); let bearer = UnixStream::connect("/tmp/pallas").unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &PROTOCOLS).unwrap(); for protocol in PROTOCOLS { let handle = muxer.use_channel(protocol); thread::spawn(move || { - let (_rx, tx) = handle; + let Channel(tx, _) = handle; loop { let payload = vec![1; 65545]; diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index 492a8b5..fda5ab0 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -109,8 +109,9 @@ where } } -type ChannelProtocolIO = (Receiver, Sender); -type ChannelProtocolHandle = (u16, ChannelProtocolIO); +pub struct Channel(pub Sender, pub Receiver); + +type ChannelProtocolHandle = (u16, Channel); type ChannelIngressHandle = (u16, Receiver); type ChannelEgressHandle = (u16, Sender); type MuxIngress = Vec; @@ -119,11 +120,11 @@ type DemuxerEgress = Vec; pub struct Multiplexer { tx_thread: JoinHandle<()>, rx_thread: JoinHandle<()>, - io_handles: HashMap, + io_handles: HashMap, } impl Multiplexer { - pub fn try_setup(bearer: TBearer, protocols: &[u16]) -> Result + pub fn setup(bearer: TBearer, protocols: &[u16]) -> Result where TBearer: Bearer + 'static, { @@ -133,8 +134,9 @@ impl Multiplexer { let (demux_tx, demux_rx) = mpsc::channel::(); let (mux_tx, mux_rx) = mpsc::channel::(); - let protocol_io = (demux_rx, mux_tx); - let protocol_handle: ChannelProtocolHandle = (*id, protocol_io); + let channel = Channel(mux_tx, demux_rx); + + let protocol_handle: ChannelProtocolHandle = (*id, channel); let ingress_handle: ChannelIngressHandle = (*id, mux_rx); let egress_handle: ChannelEgressHandle = (*id, demux_tx); @@ -152,7 +154,7 @@ impl Multiplexer { let mut rx_bearer = bearer.clone(); let rx_thread = thread::spawn(move || rx_loop(&mut rx_bearer, egress)); - let io_handles: HashMap = protocol_handles.into_iter().collect(); + let io_handles: HashMap = protocol_handles.into_iter().collect(); Ok(Multiplexer { io_handles, @@ -161,7 +163,7 @@ impl Multiplexer { }) } - pub fn use_channel(&mut self, protocol_id: u16) -> ChannelProtocolIO { + pub fn use_channel(&mut self, protocol_id: u16) -> Channel { self.io_handles.remove(&protocol_id).unwrap() } diff --git a/pallas-txsubmission/examples/naive.rs b/pallas-txsubmission/examples/naive.rs index c2f2d88..8527942 100644 --- a/pallas-txsubmission/examples/naive.rs +++ b/pallas-txsubmission/examples/naive.rs @@ -16,17 +16,17 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4]).unwrap(); + let mut muxer = Multiplexer::setup(bearer, &vec![0, 4]).unwrap(); - let (hs_rx, hs_tx) = muxer.use_channel(0); + let hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_rx, &hs_tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_channel).unwrap(); println!("{:?}", last); - let (ts_rx, ts_tx) = muxer.use_channel(4); + let ts_channel = muxer.use_channel(4); let ts = NaiveProvider::initial(vec![]); - let ts = run_agent(ts, ts_rx, &ts_tx).unwrap(); + let ts = run_agent(ts, ts_channel).unwrap(); println!("{:?}", ts); }