From 611611d0638f3b590f8ba013f22aa2a746ee29c5 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 9 Dec 2021 06:40:28 -0300 Subject: [PATCH] refactor(multiplexer): allow multiplexer channels to be sequantially shared --- pallas-blockfetch/examples/client.rs | 11 +++++------ pallas-chainsync/examples/blocks.rs | 13 ++++++------- pallas-chainsync/examples/headers.rs | 8 ++++---- pallas-handshake/examples/client.rs | 4 ++-- pallas-handshake/examples/node.rs | 4 ++-- pallas-localstate/examples/chainpoint.rs | 8 ++++---- pallas-machines/src/lib.rs | 4 ++-- pallas-machines/src/payloads.rs | 6 +++--- pallas-txsubmission/examples/naive.rs | 8 ++++---- 9 files changed, 32 insertions(+), 34 deletions(-) diff --git a/pallas-blockfetch/examples/client.rs b/pallas-blockfetch/examples/client.rs index 64e3394..b3c868a 100644 --- a/pallas-blockfetch/examples/client.rs +++ b/pallas-blockfetch/examples/client.rs @@ -3,8 +3,7 @@ use pallas_machines::primitives::Point; use std::net::TcpStream; use pallas_blockfetch::BlockFetchClient; -use pallas_handshake::n2n::{Client, VersionTable}; -use pallas_handshake::MAINNET_MAGIC; +use pallas_handshake::{MAINNET_MAGIC, n2n::{Client, VersionTable}}; use pallas_machines::run_agent; use pallas_multiplexer::Multiplexer; @@ -19,9 +18,9 @@ fn main() { let mut muxer = Multiplexer::setup(bearer, &vec![0, 3]).unwrap(); - let hs_channel = muxer.use_channel(0); + let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); let range = ( @@ -37,8 +36,8 @@ fn main() { ), ); - let bf_channel = muxer.use_channel(3); + let mut bf_channel = muxer.use_channel(3); let bf = BlockFetchClient::initial(range); - let bf_last = run_agent(bf, bf_channel); + let bf_last = run_agent(bf, &mut bf_channel); println!("{:?}", bf_last); } diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs index 0808590..8bcf76b 100644 --- a/pallas-chainsync/examples/blocks.rs +++ b/pallas-chainsync/examples/blocks.rs @@ -1,6 +1,5 @@ use pallas_chainsync::{ClientConsumer, NoopObserver}; -use pallas_handshake::n2c::{Client, VersionTable}; -use pallas_handshake::MAINNET_MAGIC; +use pallas_handshake::{MAINNET_MAGIC, n2c::{Client, VersionTable}}; use pallas_machines::primitives::Point; use pallas_machines::run_agent; use pallas_multiplexer::Multiplexer; @@ -15,9 +14,9 @@ fn main() { let mut muxer = Multiplexer::setup(bearer, &vec![0, 4, 5]).unwrap(); - let hs_channel = muxer.use_channel(0); + let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); println!("last hanshake state: {:?}", last); // some random known-point in the chain to use as starting point for the sync @@ -26,8 +25,8 @@ fn main() { hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), )]; - let cs_channel = muxer.use_channel(5); - let cs = ClientConsumer::initial(known_points, NoopObserver { }); - let cs = run_agent(cs, cs_channel).unwrap(); + let mut cs_channel = muxer.use_channel(5); + let cs = ClientConsumer::initial(known_points, NoopObserver {}); + let cs = run_agent(cs, &mut cs_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs index 644c30d..d27e184 100644 --- a/pallas-chainsync/examples/headers.rs +++ b/pallas-chainsync/examples/headers.rs @@ -16,10 +16,10 @@ fn main() { bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); let mut muxer = Multiplexer::setup(bearer, &vec![0, 2]).unwrap(); - let hs_channel = muxer.use_channel(0); + let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); let known_points = vec![Point( @@ -27,10 +27,10 @@ fn main() { hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), )]; - let cs_channel = muxer.use_channel(2); + let mut cs_channel = muxer.use_channel(2); let cs = ClientConsumer::initial(known_points, NoopObserver {}); - let cs = run_agent(cs, cs_channel).unwrap(); + let cs = run_agent(cs, &mut cs_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-handshake/examples/client.rs b/pallas-handshake/examples/client.rs index 1a2b3ab..384c82c 100644 --- a/pallas-handshake/examples/client.rs +++ b/pallas-handshake/examples/client.rs @@ -17,9 +17,9 @@ fn main() { let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap(); - let hs_channel = muxer.use_channel(0); + let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); } diff --git a/pallas-handshake/examples/node.rs b/pallas-handshake/examples/node.rs index 1182b69..a933313 100644 --- a/pallas-handshake/examples/node.rs +++ b/pallas-handshake/examples/node.rs @@ -17,10 +17,10 @@ fn main() { bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap(); - let channel = muxer.use_channel(0); + let mut channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut channel).unwrap(); println!("{:?}", last); } diff --git a/pallas-localstate/examples/chainpoint.rs b/pallas-localstate/examples/chainpoint.rs index c939d6b..0b13efd 100644 --- a/pallas-localstate/examples/chainpoint.rs +++ b/pallas-localstate/examples/chainpoint.rs @@ -16,14 +16,14 @@ fn main() { let mut muxer = Multiplexer::setup(bearer, &vec![0, 7]).unwrap(); - let hs_channel = muxer.use_channel(0); + let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::only_v10(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); println!("last hanshake state: {:?}", last); - let ls_channel = muxer.use_channel(7); + let mut ls_channel = muxer.use_channel(7); let cs = OneShotClient::::initial(None, RequestV10::GetChainPoint); - let cs = run_agent(cs, ls_channel).unwrap(); + let cs = run_agent(cs, &mut ls_channel).unwrap(); println!("{:?}", cs); } diff --git a/pallas-machines/src/lib.rs b/pallas-machines/src/lib.rs index bce1813..dbde2df 100644 --- a/pallas-machines/src/lib.rs +++ b/pallas-machines/src/lib.rs @@ -91,7 +91,7 @@ pub trait Agent: Sized { pub fn run_agent( agent: T, - channel: Channel, + channel: &mut Channel, ) -> Result> { let Channel(tx, rx) = channel; @@ -107,7 +107,7 @@ pub fn run_agent( match agent.has_agency() { true => { - agent = agent.send_next(&tx)?; + agent = agent.send_next(tx)?; } false => { let msg = input.consume_next_message::()?; diff --git a/pallas-machines/src/payloads.rs b/pallas-machines/src/payloads.rs index daac63d..cd1b973 100644 --- a/pallas-machines/src/payloads.rs +++ b/pallas-machines/src/payloads.rs @@ -112,12 +112,12 @@ impl DecodePayload for Option { } } -pub struct PayloadDeconstructor { - pub(crate) rx: Receiver, +pub struct PayloadDeconstructor<'a> { + pub(crate) rx: &'a mut Receiver, pub(crate) remaining: Vec, } -impl PayloadDeconstructor { +impl<'a> PayloadDeconstructor<'a> { pub fn consume_next_message( &mut self, ) -> Result> { diff --git a/pallas-txsubmission/examples/naive.rs b/pallas-txsubmission/examples/naive.rs index 8527942..2449d2a 100644 --- a/pallas-txsubmission/examples/naive.rs +++ b/pallas-txsubmission/examples/naive.rs @@ -18,15 +18,15 @@ fn main() { let mut muxer = Multiplexer::setup(bearer, &vec![0, 4]).unwrap(); - let hs_channel = muxer.use_channel(0); + let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), hs_channel).unwrap(); + let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); - let ts_channel = muxer.use_channel(4); + let mut ts_channel = muxer.use_channel(4); let ts = NaiveProvider::initial(vec![]); - let ts = run_agent(ts, ts_channel).unwrap(); + let ts = run_agent(ts, &mut ts_channel).unwrap(); println!("{:?}", ts); }