refactor(multiplexer): allow multiplexer channels to be sequantially shared

This commit is contained in:
Santiago Carmuega 2021-12-09 06:40:28 -03:00
parent 451751109f
commit 611611d063
9 changed files with 32 additions and 34 deletions

View file

@ -3,8 +3,7 @@ use pallas_machines::primitives::Point;
use std::net::TcpStream;
use pallas_blockfetch::BlockFetchClient;
use pallas_handshake::n2n::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_handshake::{MAINNET_MAGIC, n2n::{Client, VersionTable}};
use pallas_machines::run_agent;
use pallas_multiplexer::Multiplexer;
@ -19,9 +18,9 @@ fn main() {
let mut muxer = Multiplexer::setup(bearer, &vec![0, 3]).unwrap();
let hs_channel = muxer.use_channel(0);
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let range = (
@ -37,8 +36,8 @@ fn main() {
),
);
let bf_channel = muxer.use_channel(3);
let mut bf_channel = muxer.use_channel(3);
let bf = BlockFetchClient::initial(range);
let bf_last = run_agent(bf, bf_channel);
let bf_last = run_agent(bf, &mut bf_channel);
println!("{:?}", bf_last);
}

View file

@ -1,6 +1,5 @@
use pallas_chainsync::{ClientConsumer, NoopObserver};
use pallas_handshake::n2c::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_handshake::{MAINNET_MAGIC, n2c::{Client, VersionTable}};
use pallas_machines::primitives::Point;
use pallas_machines::run_agent;
use pallas_multiplexer::Multiplexer;
@ -15,9 +14,9 @@ fn main() {
let mut muxer = Multiplexer::setup(bearer, &vec![0, 4, 5]).unwrap();
let hs_channel = muxer.use_channel(0);
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
// some random known-point in the chain to use as starting point for the sync
@ -26,8 +25,8 @@ fn main() {
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let cs_channel = muxer.use_channel(5);
let cs = ClientConsumer::initial(known_points, NoopObserver { });
let cs = run_agent(cs, cs_channel).unwrap();
let mut cs_channel = muxer.use_channel(5);
let cs = ClientConsumer::initial(known_points, NoopObserver {});
let cs = run_agent(cs, &mut cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -16,10 +16,10 @@ fn main() {
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0, 2]).unwrap();
let hs_channel = muxer.use_channel(0);
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let known_points = vec![Point(
@ -27,10 +27,10 @@ fn main() {
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];
let cs_channel = muxer.use_channel(2);
let mut cs_channel = muxer.use_channel(2);
let cs = ClientConsumer::initial(known_points, NoopObserver {});
let cs = run_agent(cs, cs_channel).unwrap();
let cs = run_agent(cs, &mut cs_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -17,9 +17,9 @@ fn main() {
let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap();
let hs_channel = muxer.use_channel(0);
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
}

View file

@ -17,10 +17,10 @@ fn main() {
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();
let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap();
let channel = muxer.use_channel(0);
let mut channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), channel).unwrap();
let last = run_agent(Client::initial(versions), &mut channel).unwrap();
println!("{:?}", last);
}

View file

@ -16,14 +16,14 @@ fn main() {
let mut muxer = Multiplexer::setup(bearer, &vec![0, 7]).unwrap();
let hs_channel = muxer.use_channel(0);
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::only_v10(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
let ls_channel = muxer.use_channel(7);
let mut ls_channel = muxer.use_channel(7);
let cs = OneShotClient::<QueryV10>::initial(None, RequestV10::GetChainPoint);
let cs = run_agent(cs, ls_channel).unwrap();
let cs = run_agent(cs, &mut ls_channel).unwrap();
println!("{:?}", cs);
}

View file

@ -91,7 +91,7 @@ pub trait Agent: Sized {
pub fn run_agent<T: Agent + Debug>(
agent: T,
channel: Channel,
channel: &mut Channel,
) -> Result<T, Box<dyn std::error::Error>> {
let Channel(tx, rx) = channel;
@ -107,7 +107,7 @@ pub fn run_agent<T: Agent + Debug>(
match agent.has_agency() {
true => {
agent = agent.send_next(&tx)?;
agent = agent.send_next(tx)?;
}
false => {
let msg = input.consume_next_message::<T::Message>()?;

View file

@ -112,12 +112,12 @@ impl<T: DecodePayload> DecodePayload for Option<T> {
}
}
pub struct PayloadDeconstructor {
pub(crate) rx: Receiver<Payload>,
pub struct PayloadDeconstructor<'a> {
pub(crate) rx: &'a mut Receiver<Payload>,
pub(crate) remaining: Vec<u8>,
}
impl PayloadDeconstructor {
impl<'a> PayloadDeconstructor<'a> {
pub fn consume_next_message<T: DecodePayload>(
&mut self,
) -> Result<T, Box<dyn std::error::Error>> {

View file

@ -18,15 +18,15 @@ fn main() {
let mut muxer = Multiplexer::setup(bearer, &vec![0, 4]).unwrap();
let hs_channel = muxer.use_channel(0);
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), hs_channel).unwrap();
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let ts_channel = muxer.use_channel(4);
let mut ts_channel = muxer.use_channel(4);
let ts = NaiveProvider::initial(vec![]);
let ts = run_agent(ts, ts_channel).unwrap();
let ts = run_agent(ts, &mut ts_channel).unwrap();
println!("{:?}", ts);
}