From 4faf9a124de58254a31c52587fcd9ad816448520 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 22 Mar 2022 23:27:20 -0300 Subject: [PATCH] refactor(miniprotocols): Use pure functions for state machines (#84) --- examples/block-download/src/main.rs | 4 +- pallas-miniprotocols/examples/blockfetch.rs | 4 +- .../examples/chainsync-blocks.rs | 4 +- .../examples/chainsync-headers.rs | 4 +- .../examples/handshake-client.rs | 4 +- .../examples/handshake-node.rs | 4 +- .../examples/localstate-chainpoint.rs | 4 +- .../examples/txsubmission-naive.rs | 4 +- pallas-miniprotocols/src/blockfetch/mod.rs | 144 +++++++++++------ .../src/chainsync/{clients.rs => agents.rs} | 147 +++++++----------- pallas-miniprotocols/src/chainsync/mod.rs | 4 +- pallas-miniprotocols/src/handshake/agents.rs | 91 +++++++++++ pallas-miniprotocols/src/handshake/mod.rs | 6 +- pallas-miniprotocols/src/handshake/n2c.rs | 135 +--------------- pallas-miniprotocols/src/handshake/n2n.rs | 143 +---------------- .../src/handshake/{common.rs => protocol.rs} | 83 +++++++++- pallas-miniprotocols/src/localstate/mod.rs | 83 ++++------ pallas-miniprotocols/src/machines.rs | 19 ++- pallas-miniprotocols/src/txsubmission/mod.rs | 87 +++++------ pallas-multiplexer/src/bearers.rs | 3 +- 20 files changed, 446 insertions(+), 531 deletions(-) rename pallas-miniprotocols/src/chainsync/{clients.rs => agents.rs} (70%) create mode 100644 pallas-miniprotocols/src/handshake/agents.rs rename pallas-miniprotocols/src/handshake/{common.rs => protocol.rs} (56%) diff --git a/examples/block-download/src/main.rs b/examples/block-download/src/main.rs index 95d2176..6fb45e8 100644 --- a/examples/block-download/src/main.rs +++ b/examples/block-download/src/main.rs @@ -2,7 +2,7 @@ use net2::TcpStreamExt; use pallas::network::{ miniprotocols::{ - handshake::n2n::{Client, VersionTable}, + handshake::{n2n::VersionTable, Initiator}, run_agent, Point, MAINNET_MAGIC, }, multiplexer::Multiplexer, @@ -34,7 +34,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let _last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let _last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); let range = ( Point::Specific( diff --git a/pallas-miniprotocols/examples/blockfetch.rs b/pallas-miniprotocols/examples/blockfetch.rs index 2abbe5e..5d28991 100644 --- a/pallas-miniprotocols/examples/blockfetch.rs +++ b/pallas-miniprotocols/examples/blockfetch.rs @@ -2,7 +2,7 @@ use net2::TcpStreamExt; use std::net::TcpStream; use pallas_miniprotocols::blockfetch::{BatchClient, NoopObserver}; -use pallas_miniprotocols::handshake::n2n::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator}; use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC}; use pallas_multiplexer::Multiplexer; @@ -19,7 +19,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); let range = ( diff --git a/pallas-miniprotocols/examples/chainsync-blocks.rs b/pallas-miniprotocols/examples/chainsync-blocks.rs index 4d3f3ec..5dad1de 100644 --- a/pallas-miniprotocols/examples/chainsync-blocks.rs +++ b/pallas-miniprotocols/examples/chainsync-blocks.rs @@ -1,5 +1,5 @@ use pallas_miniprotocols::chainsync::{BlockContent, Consumer, NoopObserver}; -use pallas_miniprotocols::handshake::n2c::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC}; use pallas_multiplexer::Multiplexer; use std::os::unix::net::UnixStream; @@ -15,7 +15,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); println!("last hanshake state: {:?}", last); // some random known-point in the chain to use as starting point for the sync diff --git a/pallas-miniprotocols/examples/chainsync-headers.rs b/pallas-miniprotocols/examples/chainsync-headers.rs index 3282b9f..eb1ba41 100644 --- a/pallas-miniprotocols/examples/chainsync-headers.rs +++ b/pallas-miniprotocols/examples/chainsync-headers.rs @@ -5,7 +5,7 @@ use pallas_miniprotocols::Point; use std::net::TcpStream; use pallas_miniprotocols::chainsync::{Consumer, HeaderContent, NoopObserver}; -use pallas_miniprotocols::handshake::n2n::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator}; use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; use pallas_multiplexer::Multiplexer; @@ -23,7 +23,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); let known_points = vec![Point::Specific( diff --git a/pallas-miniprotocols/examples/handshake-client.rs b/pallas-miniprotocols/examples/handshake-client.rs index 8d8cdc1..15126eb 100644 --- a/pallas-miniprotocols/examples/handshake-client.rs +++ b/pallas-miniprotocols/examples/handshake-client.rs @@ -1,7 +1,7 @@ use net2::TcpStreamExt; use std::net::TcpStream; -use pallas_miniprotocols::handshake::n2c::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; use pallas_multiplexer::Multiplexer; @@ -18,7 +18,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); } diff --git a/pallas-miniprotocols/examples/handshake-node.rs b/pallas-miniprotocols/examples/handshake-node.rs index 6939783..128654c 100644 --- a/pallas-miniprotocols/examples/handshake-node.rs +++ b/pallas-miniprotocols/examples/handshake-node.rs @@ -1,7 +1,7 @@ use net2::TcpStreamExt; use std::net::TcpStream; -use pallas_miniprotocols::handshake::n2n::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator}; use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; use pallas_multiplexer::Multiplexer; @@ -18,7 +18,7 @@ fn main() { let mut channel = muxer.use_channel(0); let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut channel).unwrap(); println!("{:?}", last); } diff --git a/pallas-miniprotocols/examples/localstate-chainpoint.rs b/pallas-miniprotocols/examples/localstate-chainpoint.rs index 1ae8c1e..879537d 100644 --- a/pallas-miniprotocols/examples/localstate-chainpoint.rs +++ b/pallas-miniprotocols/examples/localstate-chainpoint.rs @@ -1,4 +1,4 @@ -use pallas_miniprotocols::handshake::n2c::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; use pallas_miniprotocols::localstate::{ queries::{QueryV10, RequestV10}, OneShotClient, @@ -19,7 +19,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::only_v10(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); println!("last hanshake state: {:?}", last); let mut ls_channel = muxer.use_channel(7); diff --git a/pallas-miniprotocols/examples/txsubmission-naive.rs b/pallas-miniprotocols/examples/txsubmission-naive.rs index 4a1a0b0..68db2a4 100644 --- a/pallas-miniprotocols/examples/txsubmission-naive.rs +++ b/pallas-miniprotocols/examples/txsubmission-naive.rs @@ -1,7 +1,7 @@ use net2::TcpStreamExt; use std::net::TcpStream; -use pallas_miniprotocols::handshake::n2c::{Client, VersionTable}; +use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; use pallas_miniprotocols::txsubmission::NaiveProvider; use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; use pallas_multiplexer::Multiplexer; @@ -19,7 +19,7 @@ fn main() { let mut hs_channel = muxer.use_channel(0); let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap(); + let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); println!("{:?}", last); let mut ts_channel = muxer.use_channel(4); diff --git a/pallas-miniprotocols/src/blockfetch/mod.rs b/pallas-miniprotocols/src/blockfetch/mod.rs index dfe305e..6043010 100644 --- a/pallas-miniprotocols/src/blockfetch/mod.rs +++ b/pallas-miniprotocols/src/blockfetch/mod.rs @@ -1,4 +1,4 @@ -use crate::machines::{Agent, MachineOutput, Transition}; +use crate::machines::{Agent, Transition}; use crate::common::Point; @@ -133,14 +133,14 @@ where } } - fn send_request_range(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::RequestRange { + fn request_range_msg(&self) -> Message { + Message::RequestRange { range: self.range.clone(), - }; + } + } - tx.send_msg(&msg)?; - - self.observer.on_block_range_requested(&self.range)?; + fn on_range_requested(self) -> Transition { + log::debug!("block range requested"); Ok(Self { state: State::Busy, @@ -155,6 +155,20 @@ where Ok(self) } + + fn on_batch_done(self) -> Transition { + Ok(Self { + state: State::Done, + ..self + }) + } + + fn on_client_done(self) -> Transition { + Ok(Self { + state: State::Done, + ..self + }) + } } impl Agent for BatchClient @@ -176,14 +190,29 @@ where } } - fn send_next(self, tx: &impl MachineOutput) -> Transition { + fn build_next(&self) -> Self::Message { match self.state { - State::Idle => self.send_request_range(tx), + State::Idle => self.request_range_msg(), _ => panic!("I don't have agency, don't know what to do"), } } - fn receive_next(self, msg: Self::Message) -> Transition { + fn apply_start(self) -> Transition { + Ok(Self { + state: State::Idle, + ..self + }) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::Idle, Message::RequestRange { .. }) => self.on_range_requested(), + (State::Idle, Message::ClientDone) => self.on_client_done(), + _ => panic!("I don't have agency, I don't expect outbound message"), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::Busy, Message::StartBatch) => Ok(Self { state: State::Streaming, @@ -194,10 +223,7 @@ where ..self }), (State::Streaming, Message::Block { body }) => self.on_block(body), - (State::Streaming, Message::BatchDone) => Ok(Self { - state: State::Done, - ..self - }), + (State::Streaming, Message::BatchDone) => self.on_batch_done(), _ => panic!("I have agency, I don't expect messages"), } } @@ -210,6 +236,8 @@ where O: Observer, { pub state: State, + pub inflight: Option<(Point, Point)>, + pub next: Option<(Point, Point)>, pub requests: I, pub observer: O, } @@ -222,44 +250,43 @@ where pub fn initial(requests: I, observer: O) -> Self { Self { state: State::Idle, + inflight: None, + next: None, requests, observer, } } - fn send_request_range(self, tx: &impl MachineOutput, point: Point) -> Transition { - log::debug!("requesting block {:?}", point); + fn wait_for_request(mut self) -> Transition { + log::debug!("waiting for external block request"); - let msg = Message::RequestRange { - range: (point.clone(), point), - }; + let next = self.requests.next(); - tx.send_msg(&msg)?; + match next { + Some(x) => Ok(Self { + state: State::Idle, + next: Some((x.clone(), x)), + ..self + }), + None => Ok(Self { + state: State::Done, + next: None, + ..self + }), + } + } + + fn on_range_requested(self, range: (Point, Point)) -> Transition { + log::debug!("requested block range {:?}", range); Ok(Self { state: State::Busy, + inflight: Some(range), + next: None, ..self }) } - fn dropout(self) -> Transition { - log::debug!("dropping out, channel will remain open"); - - Ok(Self { - state: State::Done, - ..self - }) - } - - fn wait_for_request_and_send(mut self, tx: &impl MachineOutput) -> Transition { - let point = self.requests.next(); - - match point { - Some(x) => self.send_request_range(tx, x), - None => self.dropout(), - } - } - fn on_block(self, body: Vec) -> Transition { log::debug!("received block body, size {}", body.len()); @@ -267,6 +294,17 @@ where Ok(self) } + + fn on_batch_done(self) -> Transition { + self.wait_for_request() + } + + fn on_client_done(self) -> Transition { + Ok(Self { + state: State::Done, + ..self + }) + } } impl Agent for OnDemandClient @@ -289,14 +327,29 @@ where } } - fn send_next(self, tx: &impl MachineOutput) -> Transition { - match self.state { - State::Idle => self.wait_for_request_and_send(tx), + fn build_next(&self) -> Self::Message { + match (&self.state, &self.next) { + (State::Idle, Some(range)) => Message::RequestRange { + range: range.clone(), + }, + (State::Idle, None) => panic!("I'm idle but no more block requests available"), _ => panic!("I don't have agency, don't know what to do"), } } - fn receive_next(self, msg: Self::Message) -> Transition { + fn apply_start(self) -> Transition { + self.wait_for_request() + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::Idle, Message::RequestRange { range }) => self.on_range_requested(range), + (State::Idle, Message::ClientDone) => self.on_client_done(), + _ => panic!("I don't have agency, I don't expect outbound message"), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::Busy, Message::StartBatch) => Ok(Self { state: State::Streaming, @@ -307,11 +360,8 @@ where ..self }), (State::Streaming, Message::Block { body }) => self.on_block(body), - (State::Streaming, Message::BatchDone) => Ok(Self { - state: State::Idle, - ..self - }), - _ => panic!("I have agency, I don't expect messages"), + (State::Streaming, Message::BatchDone) => self.on_batch_done(), + _ => panic!("I have agency, I don't expect inbound message"), } } } diff --git a/pallas-miniprotocols/src/chainsync/clients.rs b/pallas-miniprotocols/src/chainsync/agents.rs similarity index 70% rename from pallas-miniprotocols/src/chainsync/clients.rs rename to pallas-miniprotocols/src/chainsync/agents.rs index fb35618..00b9e6b 100644 --- a/pallas-miniprotocols/src/chainsync/clients.rs +++ b/pallas-miniprotocols/src/chainsync/agents.rs @@ -1,15 +1,16 @@ +use core::panic; use std::fmt::Debug; use std::marker::PhantomData; use pallas_codec::Fragment; -use crate::machines::{Agent, MachineError, MachineOutput, Transition}; +use crate::machines::{Agent, MachineError, Transition}; use crate::common::Point; use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip}; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum Continuation { Proceed, DropOut, @@ -89,59 +90,6 @@ where } } - fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - log::debug!("requesting find intersect"); - - let points = match &self.known_points { - Some(x) => x.clone(), - None => return Err("can't find intersect without known points".into()), - }; - - let msg = Message::::FindIntersect(points); - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Intersect, - ..self - }) - } - - fn send_request_next(self, tx: &impl MachineOutput) -> Transition { - log::debug!("requesting next"); - - let msg = Message::::RequestNext; - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::CanAwait, - ..self - }) - } - - fn send_done(self, tx: &impl MachineOutput) -> Transition { - log::debug!("notifying done"); - - let msg = Message::::Done; - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Done, - ..self - }) - } - - fn dropout(self) -> Transition { - log::debug!("dropping out, channel will keep open"); - - Ok(Self { - state: State::Done, - ..self - }) - } - fn on_intersect_found(mut self, point: Point, tip: Tip) -> Transition { log::debug!("intersect found: {:?} (tip: {:?})", point, tip); @@ -216,7 +164,7 @@ where type Message = Message; fn is_done(&self) -> bool { - self.state == State::Done + self.state == State::Done || self.continuation == Continuation::DropOut } fn has_agency(&self) -> bool { @@ -229,29 +177,41 @@ where } } - fn send_next(self, tx: &impl MachineOutput) -> Transition { - match self.continuation { - Continuation::Done => return self.send_done(tx), - Continuation::DropOut => return self.dropout(), - _ => (), - }; - - match self.state { - State::Idle => match self.intersect { - // keep going from pointer - Some(_) => self.send_request_next(tx), - _ => match self.known_points { - // need to find instersection first - Some(_) => self.send_find_intersect(tx), - // start from genesis - None => self.send_request_next(tx), - }, + fn build_next(&self) -> Self::Message { + match (&self.state, &self.intersect, &self.continuation) { + (State::Idle, _, Continuation::Done) => Message::::Done, + (State::Idle, None, Continuation::Proceed) => match &self.known_points { + Some(x) => Message::::FindIntersect(x.clone()), + None => Message::::RequestNext, }, - _ => panic!("I don't have agency, don't know what to do"), + (State::Idle, Some(_), Continuation::Proceed) => Message::::RequestNext, + _ => panic!(""), } } - fn receive_next(self, msg: Self::Message) -> Transition { + fn apply_start(self) -> Transition { + Ok(self) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (self.state, msg) { + (State::Idle, Message::RequestNext) => Ok(Self { + state: State::CanAwait, + ..self + }), + (State::Idle, Message::FindIntersect(_)) => Ok(Self { + state: State::Intersect, + ..self + }), + (State::Idle, Message::Done) => Ok(Self { + state: State::Done, + ..self + }), + _ => panic!(""), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::CanAwait, Message::RollForward(header, tip)) => { self.on_roll_forward(header, tip) @@ -291,17 +251,6 @@ impl TipFinder { } } - fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Intersect, - ..self - }) - } - fn on_intersect_found(self, tip: Tip) -> Transition { log::debug!("intersect found with tip: {:?}", tip); @@ -344,20 +293,36 @@ impl Agent for TipFinder { } } - fn send_next(self, tx: &impl MachineOutput) -> Transition { + fn build_next(&self) -> Self::Message { match self.state { - State::Idle => self.send_find_intersect(tx), - _ => panic!("I don't have agency, don't know what to do"), + State::Idle => { + Message::::FindIntersect(vec![self.wellknown_point.clone()]) + } + _ => panic!("I don't know what to do"), } } - fn receive_next(self, msg: Self::Message) -> Transition { + fn apply_start(self) -> Transition { + Ok(self) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (self.state, msg) { + (State::Idle, Message::FindIntersect(_)) => Ok(Self { + state: State::Intersect, + ..self + }), + _ => panic!("I don't know what to do"), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::Intersect, Message::IntersectFound(_point, tip)) => { self.on_intersect_found(tip) } (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), - (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), + (state, msg) => Err(MachineError::InvalidMsgForState(state.clone(), msg).into()), } } } diff --git a/pallas-miniprotocols/src/chainsync/mod.rs b/pallas-miniprotocols/src/chainsync/mod.rs index 6bdafc5..5b0b505 100644 --- a/pallas-miniprotocols/src/chainsync/mod.rs +++ b/pallas-miniprotocols/src/chainsync/mod.rs @@ -1,9 +1,9 @@ +mod agents; mod buffer; -mod clients; mod codec; mod protocol; +pub use agents::*; pub use buffer::*; -pub use clients::*; pub use codec::*; pub use protocol::*; diff --git a/pallas-miniprotocols/src/handshake/agents.rs b/pallas-miniprotocols/src/handshake/agents.rs new file mode 100644 index 0000000..0087aa6 --- /dev/null +++ b/pallas-miniprotocols/src/handshake/agents.rs @@ -0,0 +1,91 @@ +use std::fmt::Debug; + +use crate::{Agent, Transition}; + +use super::protocol::{Message, RefuseReason, State, VersionNumber, VersionTable}; + +#[derive(Debug)] +pub enum Output { + Pending, + Accepted(VersionNumber, D), + Refused(RefuseReason), +} + +#[derive(Debug)] +pub struct Initiator +where + D: Debug + Clone, +{ + pub state: State, + pub output: Output, + pub version_table: VersionTable, +} + +impl Initiator +where + D: Debug + Clone, +{ + pub fn initial(version_table: VersionTable) -> Self { + Initiator { + state: State::Propose, + output: Output::Pending, + version_table, + } + } +} + +impl Agent for Initiator +where + D: Debug + Clone, +{ + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Propose => true, + State::Confirm => false, + State::Done => false, + } + } + + fn build_next(&self) -> Self::Message { + match self.state { + State::Propose => Message::Propose(self.version_table.clone()), + _ => panic!("I don't have agency, nothing to send"), + } + } + + fn apply_start(self) -> Transition { + Ok(self) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (self.state, msg) { + (State::Propose, Message::Propose(_)) => Ok(Self { + state: State::Confirm, + ..self + }), + _ => panic!(""), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { + match (self.state, msg) { + (State::Confirm, Message::Accept(version, data)) => Ok(Self { + state: State::Done, + output: Output::Accepted(version, data), + ..self + }), + (State::Confirm, Message::Refuse(reason)) => Ok(Self { + state: State::Done, + output: Output::Refused(reason), + ..self + }), + _ => panic!("Current state does't expect to receive a message"), + } + } +} diff --git a/pallas-miniprotocols/src/handshake/mod.rs b/pallas-miniprotocols/src/handshake/mod.rs index 1ef133f..97598a8 100644 --- a/pallas-miniprotocols/src/handshake/mod.rs +++ b/pallas-miniprotocols/src/handshake/mod.rs @@ -1,4 +1,8 @@ -mod common; +mod agents; +mod protocol; pub mod n2c; pub mod n2n; + +pub use agents::*; +pub use protocol::*; diff --git a/pallas-miniprotocols/src/handshake/n2c.rs b/pallas-miniprotocols/src/handshake/n2c.rs index 535169f..273c07a 100644 --- a/pallas-miniprotocols/src/handshake/n2c.rs +++ b/pallas-miniprotocols/src/handshake/n2c.rs @@ -1,13 +1,10 @@ -use core::panic; use std::collections::HashMap; use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; -use crate::machines::{Agent, MachineOutput}; +use super::protocol::NetworkMagic; -use super::common::{NetworkMagic, RefuseReason, VersionNumber}; - -pub type VersionTable = super::common::VersionTable; +pub type VersionTable = super::protocol::VersionTable; const PROTOCOL_V1: u64 = 1; const PROTOCOL_V2: u64 = 32770; @@ -67,131 +64,3 @@ impl<'b> Decode<'b> for VersionData { Ok(Self(network_magic)) } } - -#[derive(Debug)] -pub enum Message { - Propose(VersionTable), - Accept(VersionNumber, VersionData), - Refuse(RefuseReason), -} - -impl Encode for Message { - fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { - match self { - Message::Propose(version_table) => { - e.array(2)?.u16(0)?; - version_table.encode(e)?; - } - Message::Accept(version_number, version_data) => { - e.array(3)?.u16(1)?; - e.u64(*version_number)?; - version_data.encode(e)?; - } - Message::Refuse(reason) => { - e.array(2)?.u16(2)?; - reason.encode(e)?; - } - }; - - Ok(()) - } -} - -impl<'b> Decode<'b> for Message { - fn decode(d: &mut Decoder<'b>) -> Result { - d.array()?; - - match d.u16()? { - 0 => todo!(), - 1 => { - let version_number = d.u64()?; - let version_data = VersionData::decode(d)?; - Ok(Message::Accept(version_number, version_data)) - } - 2 => { - let reason = RefuseReason::decode(d)?; - Ok(Message::Refuse(reason)) - } - _ => Err(decode::Error::message( - "unkown variant for handshake message", - )), - } - } -} - -#[derive(Debug, PartialEq, Eq)] -pub enum State { - Propose, - Confirm, - Done, -} - -#[derive(Debug)] -pub enum Output { - Pending, - Accepted(VersionNumber, VersionData), - Refused(RefuseReason), -} - -#[derive(Debug)] -pub struct Client { - pub state: State, - pub output: Output, - pub version_table: VersionTable, -} - -impl Client { - pub fn initial(version_table: VersionTable) -> Self { - Client { - state: State::Propose, - output: Output::Pending, - version_table, - } - } -} - -impl Agent for Client { - type Message = Message; - - fn is_done(&self) -> bool { - self.state == State::Done - } - - fn has_agency(&self) -> bool { - match self.state { - State::Propose => true, - State::Confirm => false, - State::Done => false, - } - } - - fn send_next(self, tx: &impl MachineOutput) -> Result> { - match self.state { - State::Propose => { - tx.send_msg(&Message::Propose(self.version_table.clone()))?; - - Ok(Self { - state: State::Confirm, - ..self - }) - } - _ => panic!("I don't have agency, nothing to send"), - } - } - - fn receive_next(self, msg: Self::Message) -> Result> { - match (self.state, msg) { - (State::Confirm, Message::Accept(version, data)) => Ok(Self { - state: State::Done, - output: Output::Accepted(version, data), - ..self - }), - (State::Confirm, Message::Refuse(reason)) => Ok(Self { - state: State::Done, - output: Output::Refused(reason), - ..self - }), - _ => panic!("Current state does't expect to receive a message"), - } - } -} diff --git a/pallas-miniprotocols/src/handshake/n2n.rs b/pallas-miniprotocols/src/handshake/n2n.rs index e5ce6ed..fdb24d5 100644 --- a/pallas-miniprotocols/src/handshake/n2n.rs +++ b/pallas-miniprotocols/src/handshake/n2n.rs @@ -1,13 +1,8 @@ -use core::panic; use std::collections::HashMap; -use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; +use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; -use crate::machines::{Agent, MachineOutput}; - -use super::common::{RefuseReason, VersionNumber}; - -pub type VersionTable = super::common::VersionTable; +pub type VersionTable = super::protocol::VersionTable; const PROTOCOL_V4: u64 = 4; const PROTOCOL_V5: u64 = 5; @@ -66,9 +61,7 @@ impl Encode for VersionData { } impl<'b> Decode<'b> for VersionData { - fn decode( - d: &mut pallas_codec::minicbor::Decoder<'b>, - ) -> Result { + fn decode(d: &mut Decoder<'b>) -> Result { d.array()?; let network_magic = d.u64()?; let initiator_and_responder_diffusion_mode = d.bool()?; @@ -79,133 +72,3 @@ impl<'b> Decode<'b> for VersionData { }) } } - -#[derive(Debug)] -pub enum Message { - Propose(VersionTable), - Accept(VersionNumber, VersionData), - Refuse(RefuseReason), -} - -impl Encode for Message { - fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { - match self { - Message::Propose(version_table) => { - e.array(2)?.u16(0)?; - version_table.encode(e)?; - } - Message::Accept(version_number, version_data) => { - e.array(3)?.u16(1)?; - e.u64(*version_number)?; - version_data.encode(e)?; - } - Message::Refuse(reason) => { - e.array(2)?.u16(2)?; - reason.encode(e)?; - } - }; - - Ok(()) - } -} - -impl<'b> Decode<'b> for Message { - fn decode( - d: &mut pallas_codec::minicbor::Decoder<'b>, - ) -> Result { - d.array()?; - - match d.u16()? { - 0 => todo!(), - 1 => { - let version_number = d.u64()?; - let version_data = VersionData::decode(d)?; - Ok(Message::Accept(version_number, version_data)) - } - 2 => { - let reason = RefuseReason::decode(d)?; - Ok(Message::Refuse(reason)) - } - _ => Err(decode::Error::message( - "unknown variant for handshake message", - )), - } - } -} - -#[derive(Debug, PartialEq, Eq)] -pub enum State { - Propose, - Confirm, - Done, -} - -#[derive(Debug)] -pub enum Output { - Pending, - Accepted(VersionNumber, VersionData), - Refused(RefuseReason), -} - -#[derive(Debug)] -pub struct Client { - pub state: State, - pub output: Output, - pub version_table: VersionTable, -} - -impl Client { - pub fn initial(version_table: VersionTable) -> Self { - Client { - state: State::Propose, - output: Output::Pending, - version_table, - } - } -} - -impl Agent for Client { - type Message = Message; - - fn is_done(&self) -> bool { - self.state == State::Done - } - - fn has_agency(&self) -> bool { - match self.state { - State::Propose => true, - State::Confirm => false, - State::Done => false, - } - } - - fn send_next(self, tx: &impl MachineOutput) -> Result> { - match self.state { - State::Propose => { - tx.send_msg(&Message::Propose(self.version_table.clone()))?; - - Ok(Self { - state: State::Confirm, - ..self - }) - } - _ => panic!("I don't have agency, nothing to send"), - } - } - - fn receive_next(self, msg: Self::Message) -> Result> { - match (self.state, msg) { - (State::Confirm, Message::Accept(version, data)) => Ok(Self { - state: State::Done, - output: Output::Accepted(version, data), - ..self - }), - (State::Confirm, Message::Refuse(reason)) => Ok(Self { - state: State::Done, - output: Output::Refused(reason), - ..self - }), - _ => panic!("Current state does't expect to receive a message"), - } - } -} diff --git a/pallas-miniprotocols/src/handshake/common.rs b/pallas-miniprotocols/src/handshake/protocol.rs similarity index 56% rename from pallas-miniprotocols/src/handshake/common.rs rename to pallas-miniprotocols/src/handshake/protocol.rs index af75921..f3162b8 100644 --- a/pallas-miniprotocols/src/handshake/common.rs +++ b/pallas-miniprotocols/src/handshake/protocol.rs @@ -10,9 +10,9 @@ where pub values: HashMap, } -impl<'b, T> Encode for VersionTable +impl Encode for VersionTable where - T: Debug + Clone + Encode + Decode<'b>, + T: Debug + Clone + Encode, { fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { e.map(self.values.len() as u64)?; @@ -26,10 +26,89 @@ where } } +impl<'b, T> Decode<'b> for VersionTable +where + T: Debug + Clone + Decode<'b>, +{ + fn decode(_d: &mut Decoder<'b>) -> Result { + todo!() + } +} + pub type NetworkMagic = u64; pub type VersionNumber = u64; +#[derive(Debug)] +pub enum Message +where + D: Debug + Clone, +{ + Propose(VersionTable), + Accept(VersionNumber, D), + Refuse(RefuseReason), +} + +impl Encode for Message +where + D: Debug + Clone, + D: Encode, + VersionTable: Encode, +{ + fn encode(&self, e: &mut Encoder) -> Result<(), encode::Error> { + match self { + Message::Propose(version_table) => { + e.array(2)?.u16(0)?; + e.encode(version_table)?; + } + Message::Accept(version_number, version_data) => { + e.array(3)?.u16(1)?; + e.u64(*version_number)?; + e.encode(version_data)?; + } + Message::Refuse(reason) => { + e.array(2)?.u16(2)?; + reason.encode(e)?; + } + }; + + Ok(()) + } +} + +impl<'b, D> Decode<'b> for Message +where + D: Decode<'b> + Debug + Clone, + VersionTable: Decode<'b>, +{ + fn decode(d: &mut Decoder<'b>) -> Result { + d.array()?; + + match d.u16()? { + 0 => todo!(), + 1 => { + let version_number = d.u64()?; + let version_data = d.decode()?; + Ok(Message::Accept(version_number, version_data)) + } + 2 => { + let reason = RefuseReason::decode(d)?; + Ok(Message::Refuse(reason)) + } + _ => Err(decode::Error::message( + "unknown variant for handshake message", + )), + } + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Propose, + Confirm, + Done, +} + #[derive(Debug)] pub enum RefuseReason { VersionMismatch(Vec), diff --git a/pallas-miniprotocols/src/localstate/mod.rs b/pallas-miniprotocols/src/localstate/mod.rs index 95a38e9..ad278f1 100644 --- a/pallas-miniprotocols/src/localstate/mod.rs +++ b/pallas-miniprotocols/src/localstate/mod.rs @@ -5,7 +5,7 @@ use std::fmt::Debug; use pallas_codec::Fragment; -use crate::machines::{Agent, MachineError, MachineOutput, Transition}; +use crate::machines::{Agent, MachineError, Transition}; use crate::common::Point; @@ -64,39 +64,6 @@ where } } - fn send_acquire(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::Acquire(self.check_point.clone()); - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Acquiring, - ..self - }) - } - - fn send_query(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::Query(self.request.clone()); - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Querying, - ..self - }) - } - - fn send_release(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::Release; - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Idle, - ..self - }) - } - fn on_acquired(self) -> Transition { log::debug!("acquired check point for chain state"); @@ -110,7 +77,8 @@ where log::debug!("query result received: {:?}", response); Ok(Self { - state: State::Acquired, + // once we get a result, since this is a one-shot client, we mutate into Done + state: State::Done, output: Some(Ok(response)), ..self }) @@ -125,13 +93,6 @@ where ..self }) } - - fn done(self) -> Transition { - Ok(Self { - state: State::Done, - ..self - }) - } } impl Agent for OneShotClient @@ -154,21 +115,45 @@ where } } - fn send_next(self, tx: &impl MachineOutput) -> Transition { + fn build_next(&self) -> Self::Message { match (&self.state, &self.output) { // if we're idle and without a result, assume start of flow - (State::Idle, None) => self.send_acquire(tx), - // if we're idle and with a result, assume end of flow - (State::Idle, Some(_)) => self.done(), + (State::Idle, None) => Message::::Acquire(self.check_point.clone()), // if we don't have an output, assume start of query - (State::Acquired, None) => self.send_query(tx), + (State::Acquired, None) => Message::::Query(self.request.clone()), // if we have an output but still acquired, release the server - (State::Acquired, Some(_)) => self.send_release(tx), + (State::Acquired, Some(_)) => Message::::Release, _ => panic!("I don't have agency, don't know what to do"), } } - fn receive_next(self, msg: Self::Message) -> Transition { + fn apply_start(self) -> Transition { + Ok(self) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (self.state, msg) { + (State::Idle, Message::Acquire(_)) => Ok(Self { + state: State::Acquiring, + ..self + }), + (State::Acquired, Message::Query(_)) => Ok(Self { + state: State::Querying, + ..self + }), + (State::Acquired, Message::Release) => Ok(Self { + state: State::Idle, + ..self + }), + (State::Idle, Message::Done) => Ok(Self { + state: State::Done, + ..self + }), + _ => panic!(""), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::Acquiring, Message::Acquired) => self.on_acquired(), (State::Acquiring, Message::Failure(failure)) => self.on_failure(failure), diff --git a/pallas-miniprotocols/src/machines.rs b/pallas-miniprotocols/src/machines.rs index 060a8a7..9fb1dab 100644 --- a/pallas-miniprotocols/src/machines.rs +++ b/pallas-miniprotocols/src/machines.rs @@ -80,8 +80,10 @@ pub trait Agent: Sized { fn is_done(&self) -> bool; fn has_agency(&self) -> bool; - fn send_next(self, tx: &impl MachineOutput) -> Transition; - fn receive_next(self, msg: Self::Message) -> Transition; + fn build_next(&self) -> Self::Message; + fn apply_start(self) -> Transition; + fn apply_outbound(self, msg: Self::Message) -> Transition; + fn apply_inbound(self, msg: Self::Message) -> Transition; } pub fn run_agent(agent: T, channel: &mut Channel) -> Result> @@ -91,20 +93,27 @@ where { let Channel(tx, rx) = channel; - let mut agent = agent; let mut buffer = Vec::new(); + let mut agent = agent.apply_start()?; + while !agent.is_done() { log::debug!("evaluating agent {:?}", agent); match agent.has_agency() { true => { - agent = agent.send_next(tx)?; + let msg = agent.build_next(); + + let mut payload = Vec::new(); + minicbor::encode(&msg, &mut payload)?; + tx.send(payload)?; + + agent = agent.apply_outbound(msg)?; } false => { let msg = read_until_full_msg::(&mut buffer, rx).unwrap(); log::trace!("procesing inbound msg: {:?}", msg); - agent = agent.receive_next(msg)?; + agent = agent.apply_inbound(msg)?; } } } diff --git a/pallas-miniprotocols/src/txsubmission/mod.rs b/pallas-miniprotocols/src/txsubmission/mod.rs index 4800803..722d68b 100644 --- a/pallas-miniprotocols/src/txsubmission/mod.rs +++ b/pallas-miniprotocols/src/txsubmission/mod.rs @@ -4,7 +4,7 @@ use itertools::Itertools; use log::debug; use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; -use crate::machines::{Agent, MachineError, MachineOutput, Transition}; +use crate::machines::{Agent, MachineError, Transition}; #[derive(Debug, PartialEq, Clone)] pub enum State { @@ -151,7 +151,6 @@ impl<'b> Decode<'b> for Message { pub struct NaiveProvider { pub state: State, pub fifo_txs: Vec, - pub acknowledged_count: usize, pub requested_ids_count: usize, pub requested_txs: Option>, } @@ -160,49 +159,27 @@ impl NaiveProvider { pub fn initial(fifo_txs: Vec) -> Self { Self { state: State::Idle, - acknowledged_count: 0, requested_ids_count: 0, requested_txs: None, fifo_txs, } } - fn send_done(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::Done; - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Done, - ..self - }) - } - - fn send_tx_ids(mut self, tx: &impl MachineOutput) -> Transition { - debug!("draining {} from tx fifo queue", self.acknowledged_count); - self.fifo_txs.drain(0..(self.acknowledged_count - 1)); - + fn reply_tx_ids_msg(&self) -> Message { debug!( "sending next {} tx ids from fifo queue", self.requested_ids_count ); + let to_send = self.fifo_txs[0..self.requested_ids_count] .iter() .map_into() .collect_vec(); - let msg = Message::ReplyTxIds(to_send); - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Idle, - acknowledged_count: 0, - requested_ids_count: 0, - ..self - }) + Message::ReplyTxIds(to_send) } - fn send_txs(self, tx: &impl MachineOutput) -> Transition { + fn reply_txs_msg(&self) -> Message { let matches = self .fifo_txs .iter() @@ -213,14 +190,7 @@ impl NaiveProvider { .map(|Tx(_, body)| body.clone()) .collect_vec(); - let msg = Message::ReplyTxs(matches); - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Idle, - requested_txs: None, - ..self - }) + Message::ReplyTxs(matches) } fn on_tx_ids_request( @@ -233,10 +203,17 @@ impl NaiveProvider { requested_ids_count, acknowledged_count ); + debug!("draining {} from tx fifo queue", acknowledged_count); + let new_fifo: Vec<_> = self + .fifo_txs + .into_iter() + .skip(acknowledged_count - 1) + .collect(); + Ok(Self { state: State::Idle, requested_ids_count, - acknowledged_count, + fifo_txs: new_fifo, ..self }) } @@ -269,16 +246,38 @@ impl Agent for NaiveProvider { } } - fn send_next(self, tx: &impl MachineOutput) -> Transition { - match self.state { - State::TxIdsBlocking => self.send_done(tx), - State::TxIdsNonBlocking => self.send_tx_ids(tx), - State::Txs => self.send_txs(tx), - _ => panic!("I don't have agency, don't know what to do"), + fn build_next(&self) -> Self::Message { + match &self.state { + State::TxIdsNonBlocking => self.reply_tx_ids_msg(), + State::TxIdsBlocking => Message::Done, + State::Txs => self.reply_txs_msg(), + _ => panic!(""), } } - fn receive_next(self, msg: Self::Message) -> Transition { + fn apply_start(self) -> Transition { + Ok(self) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + match (self.state, msg) { + (State::TxIdsNonBlocking, Message::ReplyTxIds(_)) => Ok(Self { + state: State::Idle, + ..self + }), + (State::TxIdsBlocking, Message::Done) => Ok(Self { + state: State::Done, + ..self + }), + (State::Txs, Message::ReplyTxs(_)) => Ok(Self { + state: State::Idle, + ..self + }), + _ => panic!(), + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::Idle, Message::RequestTxIds(block, ack, req)) if !block => { self.on_tx_ids_request(ack as usize, req as usize) diff --git a/pallas-multiplexer/src/bearers.rs b/pallas-multiplexer/src/bearers.rs index 6e93878..bf980ba 100644 --- a/pallas-multiplexer/src/bearers.rs +++ b/pallas-multiplexer/src/bearers.rs @@ -20,8 +20,9 @@ fn write_segment( if log_enabled!(log::Level::Trace) { trace!( - "sending segment, header {:?}, payload length: {}", + "sending segment, header {:?}, protocol id: {}, payload length: {}", hex::encode(&msg), + protocol_id, payload.len() ); }