diff --git a/pallas-blockfetch/examples/client.rs b/pallas-blockfetch/examples/client.rs index ebd9a5d..8eb670d 100644 --- a/pallas-blockfetch/examples/client.rs +++ b/pallas-blockfetch/examples/client.rs @@ -11,15 +11,14 @@ fn main() { env_logger::init(); //let bearer = TcpStream::connect("localhost:6000").unwrap(); - let bearer = - TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut handles = Multiplexer::new(bearer, &vec![0, 3]).unwrap(); - let (_, rx, tx) = handles.remove(0); + let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 3]).unwrap(); + let (rx, tx) = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); println!("{:?}", last); @@ -37,11 +36,8 @@ fn main() { ), ); - let (_, bf_rx, bf_tx) = handles.remove(0); - + let (bf_rx, bf_tx) = muxer.use_channel(3); let bf = BlockFetchClient::initial(range); - let bf_last = run_agent(bf, bf_rx, &bf_tx); - println!("{:?}", bf_last); } diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs index a8c9281..1086503 100644 --- a/pallas-chainsync/examples/blocks.rs +++ b/pallas-chainsync/examples/blocks.rs @@ -19,14 +19,14 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut handles = Multiplexer::new(bearer, &vec![0, 4, 5]).unwrap(); + let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4, 5]).unwrap(); - let (_, rx, tx) = handles.remove(0); + let (rx, tx) = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); println!("last hanshake state: {:?}", last); - let (_, ts_rx, ts_tx) = handles.remove(0); + let (ts_rx, ts_tx) = muxer.use_channel(4); let ts = NaiveProvider::initial(vec![]); let ts = run_agent(ts, ts_rx, &ts_tx).unwrap(); println!("last tx-submission state: {:?}", ts); @@ -36,7 +36,7 @@ fn main() { hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), )]; - let (_, cs_rx, cs_tx) = handles.remove(0); + let (cs_rx, cs_tx) = muxer.use_channel(5); let cs = Consumer::initial(known_points); let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); println!("{:?}", cs); diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs index 9800162..e314f92 100644 --- a/pallas-chainsync/examples/headers.rs +++ b/pallas-chainsync/examples/headers.rs @@ -16,8 +16,8 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut handles = Multiplexer::new(bearer, &vec![0, 2]).unwrap(); - let (_, rx, tx) = handles.remove(0); + let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 2]).unwrap(); + let (rx, tx) = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); @@ -28,7 +28,7 @@ fn main() { hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), )]; - let (_, cs_rx, cs_tx) = handles.remove(0); + let (cs_rx, cs_tx) = muxer.use_channel(2); let cs = Consumer::initial(known_points); let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); diff --git a/pallas-chainsync/src/lib.rs b/pallas-chainsync/src/lib.rs index e154ce0..d2143f9 100644 --- a/pallas-chainsync/src/lib.rs +++ b/pallas-chainsync/src/lib.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use log::{debug, log_enabled, trace, warn}; -use minicbor::encode; + use pallas_machines::{ Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, diff --git a/pallas-handshake/examples/client.rs b/pallas-handshake/examples/client.rs index 054d0be..41c060c 100644 --- a/pallas-handshake/examples/client.rs +++ b/pallas-handshake/examples/client.rs @@ -10,15 +10,14 @@ fn main() { env_logger::init(); //let bearer = TcpStream::connect("localhost:6000").unwrap(); - let bearer = - TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap(); - let (_, rx, tx) = handles.remove(0); + let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap(); + let (rx, tx) = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); diff --git a/pallas-handshake/examples/node.rs b/pallas-handshake/examples/node.rs index 88aade0..3cdc7e7 100644 --- a/pallas-handshake/examples/node.rs +++ b/pallas-handshake/examples/node.rs @@ -16,8 +16,8 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap(); - let (_, rx, tx) = handles.remove(0); + let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap(); + let (rx, tx) = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); diff --git a/pallas-handshake/src/n2c.rs b/pallas-handshake/src/n2c.rs index cab38ca..7961470 100644 --- a/pallas-handshake/src/n2c.rs +++ b/pallas-handshake/src/n2c.rs @@ -1,7 +1,6 @@ use core::panic; use std::collections::HashMap; -use itertools::Merge; use pallas_machines::{ Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, diff --git a/pallas-multiplexer/examples/listener.rs b/pallas-multiplexer/examples/listener.rs index 9b452da..c985236 100644 --- a/pallas-multiplexer/examples/listener.rs +++ b/pallas-multiplexer/examples/listener.rs @@ -2,21 +2,25 @@ use std::{net::TcpListener, thread, time::Duration}; use pallas_multiplexer::Multiplexer; +const PROTOCOLS: [u16; 2] = [0x8002u16, 0x8003u16]; + fn main() { env_logger::init(); let server = TcpListener::bind("0.0.0.0:3001").unwrap(); let (bearer, _) = server.accept().unwrap(); - let handles = - Multiplexer::new(bearer, &vec![0x8002u16, 0x8003u16][..]).unwrap(); + let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap(); - for handle in handles { + for protocol in PROTOCOLS { + let handle = muxer.use_channel(protocol); + thread::spawn(move || { - let (id, rx, tx) = handle; + let (rx, _tx) = handle; + loop { let payload = rx.recv().unwrap(); - println!("id:{}, length:{}", id, payload.len()); + println!("id:{}, length:{}", protocol, payload.len()); } }); } diff --git a/pallas-multiplexer/examples/sender.rs b/pallas-multiplexer/examples/sender.rs index 1788ecd..d41516c 100644 --- a/pallas-multiplexer/examples/sender.rs +++ b/pallas-multiplexer/examples/sender.rs @@ -2,22 +2,25 @@ use std::{net::TcpStream, thread, time::Duration}; use pallas_multiplexer::Multiplexer; +const PROTOCOLS: [u16; 2] = [0x0002u16, 0x0003u16]; + fn main() { env_logger::init(); let bearer = TcpStream::connect("127.0.0.1:3001").unwrap(); - let handles = - Multiplexer::new(bearer, &vec![0x0002u16, 0x0003u16][..]).unwrap(); + let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap(); + + for protocol in PROTOCOLS { + let handle = muxer.use_channel(protocol); - for (idx, handle) in handles.into_iter().enumerate() { thread::spawn(move || { - let (id, rx, tx) = handle; + let (_rx, tx) = handle; loop { let payload = vec![1; 65545]; tx.send(payload).unwrap(); thread::sleep(Duration::from_millis( - 50u64 + (idx as u64 * 10u64), + 50u64 + (protocol as u64 * 10u64), )); } }); diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index 47a3b03..3097d2f 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -1,9 +1,10 @@ use std::{ + borrow::Borrow, collections::HashMap, io::{Read, Write}, net::TcpStream, sync::mpsc::{self, Receiver, Sender, TryRecvError}, - thread, + thread::{self, JoinHandle}, time::{Duration, Instant}, }; @@ -52,7 +53,7 @@ impl Bearer for TcpStream { fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> { let mut header = [0u8; 8]; - + self.read_exact(&mut header)?; if log_enabled!(log::Level::Trace) { @@ -63,7 +64,10 @@ impl Bearer for TcpStream { let id = NetworkEndian::read_u16(&mut header[4..6]) as usize ^ 0x8000; let ts = NetworkEndian::read_u32(&mut header[0..4]); - debug!("parsed inbound msg, protocol id: {}, ts: {}, payload length: {}", id, ts, length); + debug!( + "parsed inbound msg, protocol id: {}, ts: {}, payload length: {}", + id, ts, length + ); let mut payload = vec![0u8; length]; self.read_exact(&mut payload)?; @@ -166,19 +170,21 @@ where } } -type ChannelProtocolHandle = (u16, Receiver, Sender); +type ChannelProtocolIO = (Receiver, Sender); +type ChannelProtocolHandle = (u16, ChannelProtocolIO); type ChannelIngressHandle = (u16, Receiver); type ChannelEgressHandle = (u16, Sender); type MuxIngress = Vec; type DemuxerEgress = Vec; -pub struct Multiplexer {} +pub struct Multiplexer { + tx_thread: JoinHandle<()>, + rx_thread: JoinHandle<()>, + io_handles: HashMap, +} impl Multiplexer { - pub fn new( - bearer: TBearer, - protocols: &[u16], - ) -> Result, Error> + pub fn try_setup(bearer: TBearer, protocols: &[u16]) -> Result where TBearer: Bearer + 'static, { @@ -188,7 +194,8 @@ impl Multiplexer { let (demux_tx, demux_rx) = mpsc::channel::(); let (mux_tx, mux_rx) = mpsc::channel::(); - let protocol_handle: ChannelProtocolHandle = (*id, demux_rx, mux_tx); + let protocol_io = (demux_rx, mux_tx); + let protocol_handle: ChannelProtocolHandle = (*id, protocol_io); let ingress_handle: ChannelIngressHandle = (*id, mux_rx); let egress_handle: ChannelEgressHandle = (*id, demux_tx); @@ -201,11 +208,29 @@ impl Multiplexer { let (ingress, egress): (Vec<_>, Vec<_>) = multiplex_handles.into_iter().unzip(); let mut tx_bearer = bearer.clone(); - thread::spawn(move || tx_loop(&mut tx_bearer, ingress)); + let tx_thread = thread::spawn(move || tx_loop(&mut tx_bearer, ingress)); let mut rx_bearer = bearer.clone(); - thread::spawn(move || rx_loop(&mut rx_bearer, egress)); + let rx_thread = thread::spawn(move || rx_loop(&mut rx_bearer, egress)); - Ok(protocol_handles) + let io_handles: HashMap = protocol_handles.into_iter().collect(); + + Ok(Multiplexer { + io_handles, + tx_thread, + rx_thread, + }) + } + + pub fn use_channel(&mut self, protocol_id: u16) -> ChannelProtocolIO { + self + .io_handles + .remove(&protocol_id) + .unwrap() + } + + pub fn join(self) { + self.tx_thread.join().unwrap(); + self.rx_thread.join().unwrap(); } } diff --git a/pallas-txsubmission/examples/naive.rs b/pallas-txsubmission/examples/naive.rs index 806e836..c2f2d88 100644 --- a/pallas-txsubmission/examples/naive.rs +++ b/pallas-txsubmission/examples/naive.rs @@ -16,16 +16,15 @@ fn main() { bearer.set_nodelay(true).unwrap(); bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - let mut handles = Multiplexer::new(bearer, &vec![0, 4]).unwrap(); - let (_, rx, tx) = handles.remove(0); + let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4]).unwrap(); + let (hs_rx, hs_tx) = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + let last = run_agent(Client::initial(versions), hs_rx, &hs_tx).unwrap(); println!("{:?}", last); - let (_, ts_rx, ts_tx) = handles.remove(0); - + let (ts_rx, ts_tx) = muxer.use_channel(4); let ts = NaiveProvider::initial(vec![]); let ts = run_agent(ts, ts_rx, &ts_tx).unwrap();