Tidy up multiplexer api

This commit is contained in:
Santiago Carmuega 2021-11-28 17:03:38 -03:00
parent b07a1fa7e6
commit 0f4f98dd3d
11 changed files with 52 additions and 50 deletions

View file

@ -16,11 +16,11 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 3]).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 3]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
println!("{:?}", last);
let range = (
@ -36,8 +36,8 @@ fn main() {
),
);
let (bf_rx, bf_tx) = muxer.use_channel(3);
let bf_channel = muxer.use_channel(3);
let bf = BlockFetchClient::initial(range);
let bf_last = run_agent(bf, bf_rx, &bf_tx);
let bf_last = run_agent(bf, bf_channel);
println!("{:?}", bf_last);
}

View file

@ -12,11 +12,11 @@ fn main() {
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4, 5]).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 4, 5]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
// some random known-point in the chain to use as starting point for the sync
@ -25,8 +25,8 @@ fn main() {
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let (cs_rx, cs_tx) = muxer.use_channel(5);
let cs_channel = muxer.use_channel(5);
let cs = ClientConsumer::initial(known_points, NoopStorage { });
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
let cs = run_agent(cs, cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -11,15 +11,14 @@ fn main() {
env_logger::init();
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 2]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let mut muxer = Multiplexer::setup(bearer, &vec![0, 2]).unwrap();
let hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
println!("{:?}", last);
let known_points = vec![Point(
@ -27,10 +26,10 @@ fn main() {
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let (cs_rx, cs_tx) = muxer.use_channel(2);
let cs_channel = muxer.use_channel(2);
let cs = ClientConsumer::initial(known_points, NoopStorage {});
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
let cs = run_agent(cs, cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -15,11 +15,11 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
println!("{:?}", last);
}

View file

@ -16,11 +16,11 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap();
let channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), channel).unwrap();
println!("{:?}", last);
}

View file

@ -85,15 +85,15 @@ fn main() {
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 7]).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 7]).unwrap();
let (rx, tx) = muxer.use_channel(0);
let hs_channel = muxer.use_channel(0);
let versions = VersionTable::only_v10(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), rx, &tx).unwrap();
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
let (cs_rx, cs_tx) = muxer.use_channel(7);
let ls_channel = muxer.use_channel(7);
let cs = OneShotClient::<ShelleyQuery>::initial(None, Request::GetChainPoint);
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
let cs = run_agent(cs, ls_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -1,7 +1,7 @@
mod payload;
use log::{debug, trace, warn};
use pallas_multiplexer::Payload;
use pallas_multiplexer::{Channel, Payload};
use std::borrow::Borrow;
use std::fmt::{Debug, Display};
use std::sync::mpsc::{Receiver, Sender};
@ -89,9 +89,10 @@ pub trait Agent: Sized {
pub fn run_agent<T: Agent + Debug>(
agent: T,
rx: Receiver<Payload>,
output: &impl MachineOutput,
channel: Channel,
) -> Result<T, Box<dyn std::error::Error>> {
let Channel(tx, rx) = channel;
let mut input = PayloadDeconstructor {
rx,
remaining: Vec::new(),
@ -104,7 +105,7 @@ pub fn run_agent<T: Agent + Debug>(
match agent.has_agency() {
true => {
agent = agent.send_next(output)?;
agent = agent.send_next(&tx)?;
}
false => {
let msg = input.consume_next_message::<T::Message>()?;

View file

@ -1,6 +1,6 @@
use std::{net::TcpListener, os::unix::net::UnixListener, thread, time::Duration};
use pallas_multiplexer::Multiplexer;
use pallas_multiplexer::{Channel, Multiplexer};
const PROTOCOLS: [u16; 2] = [0x8002u16, 0x8003u16];
@ -11,13 +11,13 @@ fn main() {
let server = UnixListener::bind("/tmp/pallas").unwrap();
let (bearer, _) = server.accept().unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap();
let mut muxer = Multiplexer::setup(bearer, &PROTOCOLS).unwrap();
for protocol in PROTOCOLS {
let handle = muxer.use_channel(protocol);
thread::spawn(move || {
let (rx, _tx) = handle;
let Channel(_, rx) = handle;
loop {
let payload = rx.recv().unwrap();

View file

@ -1,6 +1,6 @@
use std::{net::TcpStream, os::unix::net::UnixStream, thread, time::Duration};
use pallas_multiplexer::Multiplexer;
use pallas_multiplexer::{Channel, Multiplexer};
const PROTOCOLS: [u16; 2] = [0x0002u16, 0x0003u16];
@ -9,13 +9,13 @@ fn main() {
//let bearer = TcpStream::connect("127.0.0.1:3001").unwrap();
let bearer = UnixStream::connect("/tmp/pallas").unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &PROTOCOLS).unwrap();
let mut muxer = Multiplexer::setup(bearer, &PROTOCOLS).unwrap();
for protocol in PROTOCOLS {
let handle = muxer.use_channel(protocol);
thread::spawn(move || {
let (_rx, tx) = handle;
let Channel(tx, _) = handle;
loop {
let payload = vec![1; 65545];

View file

@ -109,8 +109,9 @@ where
}
}
type ChannelProtocolIO = (Receiver<Payload>, Sender<Payload>);
type ChannelProtocolHandle = (u16, ChannelProtocolIO);
pub struct Channel(pub Sender<Payload>, pub Receiver<Payload>);
type ChannelProtocolHandle = (u16, Channel);
type ChannelIngressHandle = (u16, Receiver<Payload>);
type ChannelEgressHandle = (u16, Sender<Payload>);
type MuxIngress = Vec<ChannelIngressHandle>;
@ -119,11 +120,11 @@ type DemuxerEgress = Vec<ChannelEgressHandle>;
pub struct Multiplexer {
tx_thread: JoinHandle<()>,
rx_thread: JoinHandle<()>,
io_handles: HashMap<u16, ChannelProtocolIO>,
io_handles: HashMap<u16, Channel>,
}
impl Multiplexer {
pub fn try_setup<TBearer>(bearer: TBearer, protocols: &[u16]) -> Result<Multiplexer, Error>
pub fn setup<TBearer>(bearer: TBearer, protocols: &[u16]) -> Result<Multiplexer, Error>
where
TBearer: Bearer + 'static,
{
@ -133,8 +134,9 @@ impl Multiplexer {
let (demux_tx, demux_rx) = mpsc::channel::<Payload>();
let (mux_tx, mux_rx) = mpsc::channel::<Payload>();
let protocol_io = (demux_rx, mux_tx);
let protocol_handle: ChannelProtocolHandle = (*id, protocol_io);
let channel = Channel(mux_tx, demux_rx);
let protocol_handle: ChannelProtocolHandle = (*id, channel);
let ingress_handle: ChannelIngressHandle = (*id, mux_rx);
let egress_handle: ChannelEgressHandle = (*id, demux_tx);
@ -152,7 +154,7 @@ impl Multiplexer {
let mut rx_bearer = bearer.clone();
let rx_thread = thread::spawn(move || rx_loop(&mut rx_bearer, egress));
let io_handles: HashMap<u16, ChannelProtocolIO> = protocol_handles.into_iter().collect();
let io_handles: HashMap<u16, Channel> = protocol_handles.into_iter().collect();
Ok(Multiplexer {
io_handles,
@ -161,7 +163,7 @@ impl Multiplexer {
})
}
pub fn use_channel(&mut self, protocol_id: u16) -> ChannelProtocolIO {
pub fn use_channel(&mut self, protocol_id: u16) -> Channel {
self.io_handles.remove(&protocol_id).unwrap()
}

View file

@ -16,17 +16,17 @@ fn main() {
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4]).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 4]).unwrap();
let (hs_rx, hs_tx) = muxer.use_channel(0);
let hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_rx, &hs_tx).unwrap();
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
println!("{:?}", last);
let (ts_rx, ts_tx) = muxer.use_channel(4);
let ts_channel = muxer.use_channel(4);
let ts = NaiveProvider::initial(vec![]);
let ts = run_agent(ts, ts_rx, &ts_tx).unwrap();
let ts = run_agent(ts, ts_channel).unwrap();
println!("{:?}", ts);
}