refactor(miniprotocols): Use pure functions for state machines (#84)

This commit is contained in:
Santiago Carmuega 2022-03-22 23:27:20 -03:00 committed by GitHub
parent 9d33defe32
commit 4faf9a124d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 446 additions and 531 deletions

View file

@ -2,7 +2,7 @@ use net2::TcpStreamExt;
use pallas::network::{
miniprotocols::{
handshake::n2n::{Client, VersionTable},
handshake::{n2n::VersionTable, Initiator},
run_agent, Point, MAINNET_MAGIC,
},
multiplexer::Multiplexer,
@ -34,7 +34,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let _last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let _last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
let range = (
Point::Specific(

View file

@ -2,7 +2,7 @@ use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::blockfetch::{BatchClient, NoopObserver};
use pallas_miniprotocols::handshake::n2n::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
@ -19,7 +19,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let range = (

View file

@ -1,5 +1,5 @@
use pallas_miniprotocols::chainsync::{BlockContent, Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
use std::os::unix::net::UnixStream;
@ -15,7 +15,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
// some random known-point in the chain to use as starting point for the sync

View file

@ -5,7 +5,7 @@ use pallas_miniprotocols::Point;
use std::net::TcpStream;
use pallas_miniprotocols::chainsync::{Consumer, HeaderContent, NoopObserver};
use pallas_miniprotocols::handshake::n2n::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
@ -23,7 +23,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let known_points = vec![Point::Specific(

View file

@ -1,7 +1,7 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
@ -18,7 +18,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
}

View file

@ -1,7 +1,7 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::handshake::n2n::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator};
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
@ -18,7 +18,7 @@ fn main() {
let mut channel = muxer.use_channel(0);
let versions = VersionTable::v4_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut channel).unwrap();
println!("{:?}", last);
}

View file

@ -1,4 +1,4 @@
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::localstate::{
queries::{QueryV10, RequestV10},
OneShotClient,
@ -19,7 +19,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::only_v10(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("last hanshake state: {:?}", last);
let mut ls_channel = muxer.use_channel(7);

View file

@ -1,7 +1,7 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator};
use pallas_miniprotocols::txsubmission::NaiveProvider;
use pallas_miniprotocols::{run_agent, MAINNET_MAGIC};
use pallas_multiplexer::Multiplexer;
@ -19,7 +19,7 @@ fn main() {
let mut hs_channel = muxer.use_channel(0);
let versions = VersionTable::v1_and_above(MAINNET_MAGIC);
let last = run_agent(Client::initial(versions), &mut hs_channel).unwrap();
let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap();
println!("{:?}", last);
let mut ts_channel = muxer.use_channel(4);

View file

@ -1,4 +1,4 @@
use crate::machines::{Agent, MachineOutput, Transition};
use crate::machines::{Agent, Transition};
use crate::common::Point;
@ -133,14 +133,14 @@ where
}
}
fn send_request_range(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::RequestRange {
fn request_range_msg(&self) -> Message {
Message::RequestRange {
range: self.range.clone(),
};
}
}
tx.send_msg(&msg)?;
self.observer.on_block_range_requested(&self.range)?;
fn on_range_requested(self) -> Transition<Self> {
log::debug!("block range requested");
Ok(Self {
state: State::Busy,
@ -155,6 +155,20 @@ where
Ok(self)
}
fn on_batch_done(self) -> Transition<Self> {
Ok(Self {
state: State::Done,
..self
})
}
fn on_client_done(self) -> Transition<Self> {
Ok(Self {
state: State::Done,
..self
})
}
}
impl<O> Agent for BatchClient<O>
@ -176,14 +190,29 @@ where
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
fn build_next(&self) -> Self::Message {
match self.state {
State::Idle => self.send_request_range(tx),
State::Idle => self.request_range_msg(),
_ => panic!("I don't have agency, don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
fn apply_start(self) -> Transition<Self> {
Ok(Self {
state: State::Idle,
..self
})
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Idle, Message::RequestRange { .. }) => self.on_range_requested(),
(State::Idle, Message::ClientDone) => self.on_client_done(),
_ => panic!("I don't have agency, I don't expect outbound message"),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Busy, Message::StartBatch) => Ok(Self {
state: State::Streaming,
@ -194,10 +223,7 @@ where
..self
}),
(State::Streaming, Message::Block { body }) => self.on_block(body),
(State::Streaming, Message::BatchDone) => Ok(Self {
state: State::Done,
..self
}),
(State::Streaming, Message::BatchDone) => self.on_batch_done(),
_ => panic!("I have agency, I don't expect messages"),
}
}
@ -210,6 +236,8 @@ where
O: Observer,
{
pub state: State,
pub inflight: Option<(Point, Point)>,
pub next: Option<(Point, Point)>,
pub requests: I,
pub observer: O,
}
@ -222,44 +250,43 @@ where
pub fn initial(requests: I, observer: O) -> Self {
Self {
state: State::Idle,
inflight: None,
next: None,
requests,
observer,
}
}
fn send_request_range(self, tx: &impl MachineOutput, point: Point) -> Transition<Self> {
log::debug!("requesting block {:?}", point);
fn wait_for_request(mut self) -> Transition<Self> {
log::debug!("waiting for external block request");
let msg = Message::RequestRange {
range: (point.clone(), point),
};
let next = self.requests.next();
tx.send_msg(&msg)?;
match next {
Some(x) => Ok(Self {
state: State::Idle,
next: Some((x.clone(), x)),
..self
}),
None => Ok(Self {
state: State::Done,
next: None,
..self
}),
}
}
fn on_range_requested(self, range: (Point, Point)) -> Transition<Self> {
log::debug!("requested block range {:?}", range);
Ok(Self {
state: State::Busy,
inflight: Some(range),
next: None,
..self
})
}
fn dropout(self) -> Transition<Self> {
log::debug!("dropping out, channel will remain open");
Ok(Self {
state: State::Done,
..self
})
}
fn wait_for_request_and_send(mut self, tx: &impl MachineOutput) -> Transition<Self> {
let point = self.requests.next();
match point {
Some(x) => self.send_request_range(tx, x),
None => self.dropout(),
}
}
fn on_block(self, body: Vec<u8>) -> Transition<Self> {
log::debug!("received block body, size {}", body.len());
@ -267,6 +294,17 @@ where
Ok(self)
}
fn on_batch_done(self) -> Transition<Self> {
self.wait_for_request()
}
fn on_client_done(self) -> Transition<Self> {
Ok(Self {
state: State::Done,
..self
})
}
}
impl<I, O> Agent for OnDemandClient<I, O>
@ -289,14 +327,29 @@ where
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::Idle => self.wait_for_request_and_send(tx),
fn build_next(&self) -> Self::Message {
match (&self.state, &self.next) {
(State::Idle, Some(range)) => Message::RequestRange {
range: range.clone(),
},
(State::Idle, None) => panic!("I'm idle but no more block requests available"),
_ => panic!("I don't have agency, don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
fn apply_start(self) -> Transition<Self> {
self.wait_for_request()
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Idle, Message::RequestRange { range }) => self.on_range_requested(range),
(State::Idle, Message::ClientDone) => self.on_client_done(),
_ => panic!("I don't have agency, I don't expect outbound message"),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Busy, Message::StartBatch) => Ok(Self {
state: State::Streaming,
@ -307,11 +360,8 @@ where
..self
}),
(State::Streaming, Message::Block { body }) => self.on_block(body),
(State::Streaming, Message::BatchDone) => Ok(Self {
state: State::Idle,
..self
}),
_ => panic!("I have agency, I don't expect messages"),
(State::Streaming, Message::BatchDone) => self.on_batch_done(),
_ => panic!("I have agency, I don't expect inbound message"),
}
}
}

View file

@ -1,15 +1,16 @@
use core::panic;
use std::fmt::Debug;
use std::marker::PhantomData;
use pallas_codec::Fragment;
use crate::machines::{Agent, MachineError, MachineOutput, Transition};
use crate::machines::{Agent, MachineError, Transition};
use crate::common::Point;
use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip};
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum Continuation {
Proceed,
DropOut,
@ -89,59 +90,6 @@ where
}
}
fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
log::debug!("requesting find intersect");
let points = match &self.known_points {
Some(x) => x.clone(),
None => return Err("can't find intersect without known points".into()),
};
let msg = Message::<C>::FindIntersect(points);
tx.send_msg(&msg)?;
Ok(Self {
state: State::Intersect,
..self
})
}
fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
log::debug!("requesting next");
let msg = Message::<C>::RequestNext;
tx.send_msg(&msg)?;
Ok(Self {
state: State::CanAwait,
..self
})
}
fn send_done(self, tx: &impl MachineOutput) -> Transition<Self> {
log::debug!("notifying done");
let msg = Message::<C>::Done;
tx.send_msg(&msg)?;
Ok(Self {
state: State::Done,
..self
})
}
fn dropout(self) -> Transition<Self> {
log::debug!("dropping out, channel will keep open");
Ok(Self {
state: State::Done,
..self
})
}
fn on_intersect_found(mut self, point: Point, tip: Tip) -> Transition<Self> {
log::debug!("intersect found: {:?} (tip: {:?})", point, tip);
@ -216,7 +164,7 @@ where
type Message = Message<C>;
fn is_done(&self) -> bool {
self.state == State::Done
self.state == State::Done || self.continuation == Continuation::DropOut
}
fn has_agency(&self) -> bool {
@ -229,29 +177,41 @@ where
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.continuation {
Continuation::Done => return self.send_done(tx),
Continuation::DropOut => return self.dropout(),
_ => (),
};
match self.state {
State::Idle => match self.intersect {
// keep going from pointer
Some(_) => self.send_request_next(tx),
_ => match self.known_points {
// need to find instersection first
Some(_) => self.send_find_intersect(tx),
// start from genesis
None => self.send_request_next(tx),
},
fn build_next(&self) -> Self::Message {
match (&self.state, &self.intersect, &self.continuation) {
(State::Idle, _, Continuation::Done) => Message::<C>::Done,
(State::Idle, None, Continuation::Proceed) => match &self.known_points {
Some(x) => Message::<C>::FindIntersect(x.clone()),
None => Message::<C>::RequestNext,
},
_ => panic!("I don't have agency, don't know what to do"),
(State::Idle, Some(_), Continuation::Proceed) => Message::<C>::RequestNext,
_ => panic!(""),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::Idle, Message::RequestNext) => Ok(Self {
state: State::CanAwait,
..self
}),
(State::Idle, Message::FindIntersect(_)) => Ok(Self {
state: State::Intersect,
..self
}),
(State::Idle, Message::Done) => Ok(Self {
state: State::Done,
..self
}),
_ => panic!(""),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::CanAwait, Message::RollForward(header, tip)) => {
self.on_roll_forward(header, tip)
@ -291,17 +251,6 @@ impl TipFinder {
}
}
fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<SkippedContent>::FindIntersect(vec![self.wellknown_point.clone()]);
tx.send_msg(&msg)?;
Ok(Self {
state: State::Intersect,
..self
})
}
fn on_intersect_found(self, tip: Tip) -> Transition<Self> {
log::debug!("intersect found with tip: {:?}", tip);
@ -344,20 +293,36 @@ impl Agent for TipFinder {
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
fn build_next(&self) -> Self::Message {
match self.state {
State::Idle => self.send_find_intersect(tx),
_ => panic!("I don't have agency, don't know what to do"),
State::Idle => {
Message::<SkippedContent>::FindIntersect(vec![self.wellknown_point.clone()])
}
_ => panic!("I don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::Idle, Message::FindIntersect(_)) => Ok(Self {
state: State::Intersect,
..self
}),
_ => panic!("I don't know what to do"),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Intersect, Message::IntersectFound(_point, tip)) => {
self.on_intersect_found(tip)
}
(State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
(_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
(state, msg) => Err(MachineError::InvalidMsgForState(state.clone(), msg).into()),
}
}
}

View file

@ -1,9 +1,9 @@
mod agents;
mod buffer;
mod clients;
mod codec;
mod protocol;
pub use agents::*;
pub use buffer::*;
pub use clients::*;
pub use codec::*;
pub use protocol::*;

View file

@ -0,0 +1,91 @@
use std::fmt::Debug;
use crate::{Agent, Transition};
use super::protocol::{Message, RefuseReason, State, VersionNumber, VersionTable};
#[derive(Debug)]
pub enum Output<D> {
Pending,
Accepted(VersionNumber, D),
Refused(RefuseReason),
}
#[derive(Debug)]
pub struct Initiator<D>
where
D: Debug + Clone,
{
pub state: State,
pub output: Output<D>,
pub version_table: VersionTable<D>,
}
impl<D> Initiator<D>
where
D: Debug + Clone,
{
pub fn initial(version_table: VersionTable<D>) -> Self {
Initiator {
state: State::Propose,
output: Output::Pending,
version_table,
}
}
}
impl<D> Agent for Initiator<D>
where
D: Debug + Clone,
{
type Message = Message<D>;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Propose => true,
State::Confirm => false,
State::Done => false,
}
}
fn build_next(&self) -> Self::Message {
match self.state {
State::Propose => Message::Propose(self.version_table.clone()),
_ => panic!("I don't have agency, nothing to send"),
}
}
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::Propose, Message::Propose(_)) => Ok(Self {
state: State::Confirm,
..self
}),
_ => panic!(""),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::Confirm, Message::Accept(version, data)) => Ok(Self {
state: State::Done,
output: Output::Accepted(version, data),
..self
}),
(State::Confirm, Message::Refuse(reason)) => Ok(Self {
state: State::Done,
output: Output::Refused(reason),
..self
}),
_ => panic!("Current state does't expect to receive a message"),
}
}
}

View file

@ -1,4 +1,8 @@
mod common;
mod agents;
mod protocol;
pub mod n2c;
pub mod n2n;
pub use agents::*;
pub use protocol::*;

View file

@ -1,13 +1,10 @@
use core::panic;
use std::collections::HashMap;
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use crate::machines::{Agent, MachineOutput};
use super::protocol::NetworkMagic;
use super::common::{NetworkMagic, RefuseReason, VersionNumber};
pub type VersionTable = super::common::VersionTable<VersionData>;
pub type VersionTable = super::protocol::VersionTable<VersionData>;
const PROTOCOL_V1: u64 = 1;
const PROTOCOL_V2: u64 = 32770;
@ -67,131 +64,3 @@ impl<'b> Decode<'b> for VersionData {
Ok(Self(network_magic))
}
}
#[derive(Debug)]
pub enum Message {
Propose(VersionTable),
Accept(VersionNumber, VersionData),
Refuse(RefuseReason),
}
impl Encode for Message {
fn encode<W: encode::Write>(&self, e: &mut Encoder<W>) -> Result<(), encode::Error<W::Error>> {
match self {
Message::Propose(version_table) => {
e.array(2)?.u16(0)?;
version_table.encode(e)?;
}
Message::Accept(version_number, version_data) => {
e.array(3)?.u16(1)?;
e.u64(*version_number)?;
version_data.encode(e)?;
}
Message::Refuse(reason) => {
e.array(2)?.u16(2)?;
reason.encode(e)?;
}
};
Ok(())
}
}
impl<'b> Decode<'b> for Message {
fn decode(d: &mut Decoder<'b>) -> Result<Self, decode::Error> {
d.array()?;
match d.u16()? {
0 => todo!(),
1 => {
let version_number = d.u64()?;
let version_data = VersionData::decode(d)?;
Ok(Message::Accept(version_number, version_data))
}
2 => {
let reason = RefuseReason::decode(d)?;
Ok(Message::Refuse(reason))
}
_ => Err(decode::Error::message(
"unkown variant for handshake message",
)),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
Propose,
Confirm,
Done,
}
#[derive(Debug)]
pub enum Output {
Pending,
Accepted(VersionNumber, VersionData),
Refused(RefuseReason),
}
#[derive(Debug)]
pub struct Client {
pub state: State,
pub output: Output,
pub version_table: VersionTable,
}
impl Client {
pub fn initial(version_table: VersionTable) -> Self {
Client {
state: State::Propose,
output: Output::Pending,
version_table,
}
}
}
impl Agent for Client {
type Message = Message;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Propose => true,
State::Confirm => false,
State::Done => false,
}
}
fn send_next(self, tx: &impl MachineOutput) -> Result<Self, Box<dyn std::error::Error>> {
match self.state {
State::Propose => {
tx.send_msg(&Message::Propose(self.version_table.clone()))?;
Ok(Self {
state: State::Confirm,
..self
})
}
_ => panic!("I don't have agency, nothing to send"),
}
}
fn receive_next(self, msg: Self::Message) -> Result<Self, Box<dyn std::error::Error>> {
match (self.state, msg) {
(State::Confirm, Message::Accept(version, data)) => Ok(Self {
state: State::Done,
output: Output::Accepted(version, data),
..self
}),
(State::Confirm, Message::Refuse(reason)) => Ok(Self {
state: State::Done,
output: Output::Refused(reason),
..self
}),
_ => panic!("Current state does't expect to receive a message"),
}
}
}

View file

@ -1,13 +1,8 @@
use core::panic;
use std::collections::HashMap;
use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder};
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use crate::machines::{Agent, MachineOutput};
use super::common::{RefuseReason, VersionNumber};
pub type VersionTable = super::common::VersionTable<VersionData>;
pub type VersionTable = super::protocol::VersionTable<VersionData>;
const PROTOCOL_V4: u64 = 4;
const PROTOCOL_V5: u64 = 5;
@ -66,9 +61,7 @@ impl Encode for VersionData {
}
impl<'b> Decode<'b> for VersionData {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
) -> Result<Self, pallas_codec::minicbor::decode::Error> {
fn decode(d: &mut Decoder<'b>) -> Result<Self, decode::Error> {
d.array()?;
let network_magic = d.u64()?;
let initiator_and_responder_diffusion_mode = d.bool()?;
@ -79,133 +72,3 @@ impl<'b> Decode<'b> for VersionData {
})
}
}
#[derive(Debug)]
pub enum Message {
Propose(VersionTable),
Accept(VersionNumber, VersionData),
Refuse(RefuseReason),
}
impl Encode for Message {
fn encode<W: encode::Write>(&self, e: &mut Encoder<W>) -> Result<(), encode::Error<W::Error>> {
match self {
Message::Propose(version_table) => {
e.array(2)?.u16(0)?;
version_table.encode(e)?;
}
Message::Accept(version_number, version_data) => {
e.array(3)?.u16(1)?;
e.u64(*version_number)?;
version_data.encode(e)?;
}
Message::Refuse(reason) => {
e.array(2)?.u16(2)?;
reason.encode(e)?;
}
};
Ok(())
}
}
impl<'b> Decode<'b> for Message {
fn decode(
d: &mut pallas_codec::minicbor::Decoder<'b>,
) -> Result<Self, pallas_codec::minicbor::decode::Error> {
d.array()?;
match d.u16()? {
0 => todo!(),
1 => {
let version_number = d.u64()?;
let version_data = VersionData::decode(d)?;
Ok(Message::Accept(version_number, version_data))
}
2 => {
let reason = RefuseReason::decode(d)?;
Ok(Message::Refuse(reason))
}
_ => Err(decode::Error::message(
"unknown variant for handshake message",
)),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
Propose,
Confirm,
Done,
}
#[derive(Debug)]
pub enum Output {
Pending,
Accepted(VersionNumber, VersionData),
Refused(RefuseReason),
}
#[derive(Debug)]
pub struct Client {
pub state: State,
pub output: Output,
pub version_table: VersionTable,
}
impl Client {
pub fn initial(version_table: VersionTable) -> Self {
Client {
state: State::Propose,
output: Output::Pending,
version_table,
}
}
}
impl Agent for Client {
type Message = Message;
fn is_done(&self) -> bool {
self.state == State::Done
}
fn has_agency(&self) -> bool {
match self.state {
State::Propose => true,
State::Confirm => false,
State::Done => false,
}
}
fn send_next(self, tx: &impl MachineOutput) -> Result<Self, Box<dyn std::error::Error>> {
match self.state {
State::Propose => {
tx.send_msg(&Message::Propose(self.version_table.clone()))?;
Ok(Self {
state: State::Confirm,
..self
})
}
_ => panic!("I don't have agency, nothing to send"),
}
}
fn receive_next(self, msg: Self::Message) -> Result<Self, Box<dyn std::error::Error>> {
match (self.state, msg) {
(State::Confirm, Message::Accept(version, data)) => Ok(Self {
state: State::Done,
output: Output::Accepted(version, data),
..self
}),
(State::Confirm, Message::Refuse(reason)) => Ok(Self {
state: State::Done,
output: Output::Refused(reason),
..self
}),
_ => panic!("Current state does't expect to receive a message"),
}
}
}

View file

@ -10,9 +10,9 @@ where
pub values: HashMap<u64, T>,
}
impl<'b, T> Encode for VersionTable<T>
impl<T> Encode for VersionTable<T>
where
T: Debug + Clone + Encode + Decode<'b>,
T: Debug + Clone + Encode,
{
fn encode<W: encode::Write>(&self, e: &mut Encoder<W>) -> Result<(), encode::Error<W::Error>> {
e.map(self.values.len() as u64)?;
@ -26,10 +26,89 @@ where
}
}
impl<'b, T> Decode<'b> for VersionTable<T>
where
T: Debug + Clone + Decode<'b>,
{
fn decode(_d: &mut Decoder<'b>) -> Result<Self, decode::Error> {
todo!()
}
}
pub type NetworkMagic = u64;
pub type VersionNumber = u64;
#[derive(Debug)]
pub enum Message<D>
where
D: Debug + Clone,
{
Propose(VersionTable<D>),
Accept(VersionNumber, D),
Refuse(RefuseReason),
}
impl<D> Encode for Message<D>
where
D: Debug + Clone,
D: Encode,
VersionTable<D>: Encode,
{
fn encode<W: encode::Write>(&self, e: &mut Encoder<W>) -> Result<(), encode::Error<W::Error>> {
match self {
Message::Propose(version_table) => {
e.array(2)?.u16(0)?;
e.encode(version_table)?;
}
Message::Accept(version_number, version_data) => {
e.array(3)?.u16(1)?;
e.u64(*version_number)?;
e.encode(version_data)?;
}
Message::Refuse(reason) => {
e.array(2)?.u16(2)?;
reason.encode(e)?;
}
};
Ok(())
}
}
impl<'b, D> Decode<'b> for Message<D>
where
D: Decode<'b> + Debug + Clone,
VersionTable<D>: Decode<'b>,
{
fn decode(d: &mut Decoder<'b>) -> Result<Self, decode::Error> {
d.array()?;
match d.u16()? {
0 => todo!(),
1 => {
let version_number = d.u64()?;
let version_data = d.decode()?;
Ok(Message::Accept(version_number, version_data))
}
2 => {
let reason = RefuseReason::decode(d)?;
Ok(Message::Refuse(reason))
}
_ => Err(decode::Error::message(
"unknown variant for handshake message",
)),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
Propose,
Confirm,
Done,
}
#[derive(Debug)]
pub enum RefuseReason {
VersionMismatch(Vec<VersionNumber>),

View file

@ -5,7 +5,7 @@ use std::fmt::Debug;
use pallas_codec::Fragment;
use crate::machines::{Agent, MachineError, MachineOutput, Transition};
use crate::machines::{Agent, MachineError, Transition};
use crate::common::Point;
@ -64,39 +64,6 @@ where
}
}
fn send_acquire(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<Q>::Acquire(self.check_point.clone());
tx.send_msg(&msg)?;
Ok(Self {
state: State::Acquiring,
..self
})
}
fn send_query(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<Q>::Query(self.request.clone());
tx.send_msg(&msg)?;
Ok(Self {
state: State::Querying,
..self
})
}
fn send_release(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<Q>::Release;
tx.send_msg(&msg)?;
Ok(Self {
state: State::Idle,
..self
})
}
fn on_acquired(self) -> Transition<Self> {
log::debug!("acquired check point for chain state");
@ -110,7 +77,8 @@ where
log::debug!("query result received: {:?}", response);
Ok(Self {
state: State::Acquired,
// once we get a result, since this is a one-shot client, we mutate into Done
state: State::Done,
output: Some(Ok(response)),
..self
})
@ -125,13 +93,6 @@ where
..self
})
}
fn done(self) -> Transition<Self> {
Ok(Self {
state: State::Done,
..self
})
}
}
impl<Q> Agent for OneShotClient<Q>
@ -154,21 +115,45 @@ where
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
fn build_next(&self) -> Self::Message {
match (&self.state, &self.output) {
// if we're idle and without a result, assume start of flow
(State::Idle, None) => self.send_acquire(tx),
// if we're idle and with a result, assume end of flow
(State::Idle, Some(_)) => self.done(),
(State::Idle, None) => Message::<Q>::Acquire(self.check_point.clone()),
// if we don't have an output, assume start of query
(State::Acquired, None) => self.send_query(tx),
(State::Acquired, None) => Message::<Q>::Query(self.request.clone()),
// if we have an output but still acquired, release the server
(State::Acquired, Some(_)) => self.send_release(tx),
(State::Acquired, Some(_)) => Message::<Q>::Release,
_ => panic!("I don't have agency, don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::Idle, Message::Acquire(_)) => Ok(Self {
state: State::Acquiring,
..self
}),
(State::Acquired, Message::Query(_)) => Ok(Self {
state: State::Querying,
..self
}),
(State::Acquired, Message::Release) => Ok(Self {
state: State::Idle,
..self
}),
(State::Idle, Message::Done) => Ok(Self {
state: State::Done,
..self
}),
_ => panic!(""),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Acquiring, Message::Acquired) => self.on_acquired(),
(State::Acquiring, Message::Failure(failure)) => self.on_failure(failure),

View file

@ -80,8 +80,10 @@ pub trait Agent: Sized {
fn is_done(&self) -> bool;
fn has_agency(&self) -> bool;
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self>;
fn receive_next(self, msg: Self::Message) -> Transition<Self>;
fn build_next(&self) -> Self::Message;
fn apply_start(self) -> Transition<Self>;
fn apply_outbound(self, msg: Self::Message) -> Transition<Self>;
fn apply_inbound(self, msg: Self::Message) -> Transition<Self>;
}
pub fn run_agent<T>(agent: T, channel: &mut Channel) -> Result<T, Box<dyn std::error::Error>>
@ -91,20 +93,27 @@ where
{
let Channel(tx, rx) = channel;
let mut agent = agent;
let mut buffer = Vec::new();
let mut agent = agent.apply_start()?;
while !agent.is_done() {
log::debug!("evaluating agent {:?}", agent);
match agent.has_agency() {
true => {
agent = agent.send_next(tx)?;
let msg = agent.build_next();
let mut payload = Vec::new();
minicbor::encode(&msg, &mut payload)?;
tx.send(payload)?;
agent = agent.apply_outbound(msg)?;
}
false => {
let msg = read_until_full_msg::<T::Message>(&mut buffer, rx).unwrap();
log::trace!("procesing inbound msg: {:?}", msg);
agent = agent.receive_next(msg)?;
agent = agent.apply_inbound(msg)?;
}
}
}

View file

@ -4,7 +4,7 @@ use itertools::Itertools;
use log::debug;
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};
use crate::machines::{Agent, MachineError, MachineOutput, Transition};
use crate::machines::{Agent, MachineError, Transition};
#[derive(Debug, PartialEq, Clone)]
pub enum State {
@ -151,7 +151,6 @@ impl<'b> Decode<'b> for Message {
pub struct NaiveProvider {
pub state: State,
pub fifo_txs: Vec<Tx>,
pub acknowledged_count: usize,
pub requested_ids_count: usize,
pub requested_txs: Option<Vec<TxId>>,
}
@ -160,49 +159,27 @@ impl NaiveProvider {
pub fn initial(fifo_txs: Vec<Tx>) -> Self {
Self {
state: State::Idle,
acknowledged_count: 0,
requested_ids_count: 0,
requested_txs: None,
fifo_txs,
}
}
fn send_done(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::Done;
tx.send_msg(&msg)?;
Ok(Self {
state: State::Done,
..self
})
}
fn send_tx_ids(mut self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("draining {} from tx fifo queue", self.acknowledged_count);
self.fifo_txs.drain(0..(self.acknowledged_count - 1));
fn reply_tx_ids_msg(&self) -> Message {
debug!(
"sending next {} tx ids from fifo queue",
self.requested_ids_count
);
let to_send = self.fifo_txs[0..self.requested_ids_count]
.iter()
.map_into()
.collect_vec();
let msg = Message::ReplyTxIds(to_send);
tx.send_msg(&msg)?;
Ok(Self {
state: State::Idle,
acknowledged_count: 0,
requested_ids_count: 0,
..self
})
Message::ReplyTxIds(to_send)
}
fn send_txs(self, tx: &impl MachineOutput) -> Transition<Self> {
fn reply_txs_msg(&self) -> Message {
let matches = self
.fifo_txs
.iter()
@ -213,14 +190,7 @@ impl NaiveProvider {
.map(|Tx(_, body)| body.clone())
.collect_vec();
let msg = Message::ReplyTxs(matches);
tx.send_msg(&msg)?;
Ok(Self {
state: State::Idle,
requested_txs: None,
..self
})
Message::ReplyTxs(matches)
}
fn on_tx_ids_request(
@ -233,10 +203,17 @@ impl NaiveProvider {
requested_ids_count, acknowledged_count
);
debug!("draining {} from tx fifo queue", acknowledged_count);
let new_fifo: Vec<_> = self
.fifo_txs
.into_iter()
.skip(acknowledged_count - 1)
.collect();
Ok(Self {
state: State::Idle,
requested_ids_count,
acknowledged_count,
fifo_txs: new_fifo,
..self
})
}
@ -269,16 +246,38 @@ impl Agent for NaiveProvider {
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::TxIdsBlocking => self.send_done(tx),
State::TxIdsNonBlocking => self.send_tx_ids(tx),
State::Txs => self.send_txs(tx),
_ => panic!("I don't have agency, don't know what to do"),
fn build_next(&self) -> Self::Message {
match &self.state {
State::TxIdsNonBlocking => self.reply_tx_ids_msg(),
State::TxIdsBlocking => Message::Done,
State::Txs => self.reply_txs_msg(),
_ => panic!(""),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
fn apply_start(self) -> Transition<Self> {
Ok(self)
}
fn apply_outbound(self, msg: Self::Message) -> Transition<Self> {
match (self.state, msg) {
(State::TxIdsNonBlocking, Message::ReplyTxIds(_)) => Ok(Self {
state: State::Idle,
..self
}),
(State::TxIdsBlocking, Message::Done) => Ok(Self {
state: State::Done,
..self
}),
(State::Txs, Message::ReplyTxs(_)) => Ok(Self {
state: State::Idle,
..self
}),
_ => panic!(),
}
}
fn apply_inbound(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Idle, Message::RequestTxIds(block, ack, req)) if !block => {
self.on_tx_ids_request(ack as usize, req as usize)

View file

@ -20,8 +20,9 @@ fn write_segment(
if log_enabled!(log::Level::Trace) {
trace!(
"sending segment, header {:?}, payload length: {}",
"sending segment, header {:?}, protocol id: {}, payload length: {}",
hex::encode(&msg),
protocol_id,
payload.len()
);
}