diff --git a/pallas-blockfetch/src/lib.rs b/pallas-blockfetch/src/lib.rs index 5ba865b..749fc28 100644 --- a/pallas-blockfetch/src/lib.rs +++ b/pallas-blockfetch/src/lib.rs @@ -1,8 +1,5 @@ use log::info; -use pallas_machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, - PayloadDecoder, PayloadEncoder, Transition, -}; +use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, Transition}; #[derive(Clone, Debug)] pub struct Point(pub u64, pub Vec); @@ -110,7 +107,7 @@ impl DecodePayload for Message { }) } 5 => Ok(Message::BatchDone), - x => Err(Box::new(MachineError::BadLabel(x))), + x => Err(Box::new(CodecError::BadLabel(x))), } } } diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs index 84247ce..5d3251d 100644 --- a/pallas-chainsync/examples/blocks.rs +++ b/pallas-chainsync/examples/blocks.rs @@ -1,4 +1,4 @@ -use pallas_chainsync::{ClientConsumer, Point}; +use pallas_chainsync::{ClientConsumer, NoopStorage, Point}; use pallas_handshake::n2c::{Client, VersionTable}; use pallas_handshake::MAINNET_MAGIC; use pallas_machines::run_agent; @@ -26,7 +26,7 @@ fn main() { )]; let (cs_rx, cs_tx) = muxer.use_channel(5); - let cs = ClientConsumer::initial(known_points); + let cs = ClientConsumer::initial(known_points, NoopStorage { }); let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); println!("{:?}", cs); } diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs index 5f9502e..683fea0 100644 --- a/pallas-chainsync/examples/headers.rs +++ b/pallas-chainsync/examples/headers.rs @@ -1,7 +1,7 @@ use net2::TcpStreamExt; use std::net::TcpStream; -use pallas_chainsync::{ClientConsumer, Point}; +use pallas_chainsync::{ClientConsumer, NoopStorage, Point}; use pallas_handshake::n2n::{Client, VersionTable}; use pallas_handshake::MAINNET_MAGIC; use pallas_machines::run_agent; @@ -29,7 +29,7 @@ fn main() { let (cs_rx, cs_tx) = muxer.use_channel(2); - let cs = ClientConsumer::initial(known_points); + let cs = ClientConsumer::initial(known_points, NoopStorage {}); let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); println!("{:?}", cs); diff --git a/pallas-chainsync/src/lib.rs b/pallas-chainsync/src/lib.rs index 6db809e..b27bee9 100644 --- a/pallas-chainsync/src/lib.rs +++ b/pallas-chainsync/src/lib.rs @@ -4,7 +4,7 @@ use log::{debug, log_enabled, trace}; use minicbor::data::Tag; use pallas_machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, + Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, }; @@ -63,10 +63,10 @@ impl DecodePayload for WrappedHeader { } #[derive(Debug)] -pub struct BlockBody(Vec); +pub struct BlockBody(pub Vec); impl EncodePayload for BlockBody { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { todo!() } } @@ -116,7 +116,7 @@ pub enum State { #[derive(Debug)] pub enum Message where - C: EncodePayload + DecodePayload, + C: EncodePayload + DecodePayload + Sized, { RequestNext, AwaitReply, @@ -216,32 +216,58 @@ where Ok(Message::IntersectNotFound(tip)) } 7 => Ok(Message::Done), - x => Err(Box::new(MachineError::BadLabel(x))), + x => Err(Box::new(CodecError::BadLabel(x))), } } } +/// An abstraction of a component in charge of block persistence +pub trait Storage { + fn save_block(&self, content: &C) -> Result<(), Box>; +} + #[derive(Debug)] -pub struct Consumer { +pub struct NoopStorage {} + +impl Storage for NoopStorage +where + C: Debug, +{ + fn save_block(&self, content: &C) -> Result<(), Box> { + log::warn!("asked to save block {:?}", content); + Ok(()) + } +} + +#[derive(Debug)] +pub struct Consumer +where + S: Storage, +{ pub state: State, pub known_points: Vec, pub cursor: Option, pub tip: Option, + storage: S, + // as recommended here: https://doc.rust-lang.org/error-index.html#E0207 _phantom: Option, } -impl Consumer +impl Consumer where C: EncodePayload + DecodePayload + Debug, + S: Storage, { - pub fn initial(known_points: Vec) -> Self { + pub fn initial(known_points: Vec, storage: S) -> Self { Self { state: State::Idle, cursor: None, tip: None, known_points, + storage, + _phantom: None, } } @@ -294,9 +320,12 @@ where debug!("rolling forward"); if log_enabled!(log::Level::Trace) { - trace!("header: {:?}", content); + trace!("content: {:?}", content); } + debug!("saving block"); + self.storage.save_block(&content)?; + Ok(Self { tip: Some(tip), state: State::Idle, @@ -314,11 +343,21 @@ where ..self }) } + + fn on_await_reply(self) -> Transition { + debug!("reached tip, await reply"); + + Ok(Self { + state: State::MustReply, + ..self + }) + } } -impl Agent for Consumer +impl Agent for Consumer where - C: EncodePayload + DecodePayload + Debug, + C: EncodePayload + DecodePayload + Debug + 'static, + S: Storage, { type Message = Message; @@ -354,6 +393,7 @@ where (State::CanAwait, Message::RollBackward(point, tip)) => { self.on_roll_backward(point, tip) } + (State::CanAwait, Message::AwaitReply) => self.on_await_reply(), (State::MustReply, Message::RollForward(header, tip)) => { self.on_roll_forward(header, tip) } @@ -364,11 +404,11 @@ where self.on_intersect_found(point, tip) } (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), - _ => Err(Box::new(MachineError::InvalidMsgForState)), + (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), } } } -pub type NodeConsumer = Consumer; +pub type NodeConsumer = Consumer; -pub type ClientConsumer = Consumer; +pub type ClientConsumer = Consumer; diff --git a/pallas-handshake/src/common.rs b/pallas-handshake/src/common.rs index a3334f5..de95154 100644 --- a/pallas-handshake/src/common.rs +++ b/pallas-handshake/src/common.rs @@ -1,5 +1,5 @@ use itertools::Itertools; -use pallas_machines::{DecodePayload, EncodePayload, MachineError, PayloadEncoder}; +use pallas_machines::{CodecError, DecodePayload, EncodePayload, PayloadEncoder}; use std::{collections::HashMap, fmt::Debug}; pub const TESTNET_MAGIC: u64 = 1097911063; @@ -97,7 +97,7 @@ impl DecodePayload for RefuseReason { Ok(RefuseReason::Refused(version, msg.to_string())) } - x => Err(Box::new(MachineError::BadLabel(x))), + x => Err(Box::new(CodecError::BadLabel(x))), } } } diff --git a/pallas-handshake/src/n2c.rs b/pallas-handshake/src/n2c.rs index 7961470..22e257e 100644 --- a/pallas-handshake/src/n2c.rs +++ b/pallas-handshake/src/n2c.rs @@ -1,10 +1,7 @@ use core::panic; use std::collections::HashMap; -use pallas_machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, - PayloadEncoder, -}; +use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder}; use crate::common::{NetworkMagic, RefuseReason, VersionNumber}; @@ -104,7 +101,7 @@ impl DecodePayload for Message { let reason = RefuseReason::decode_payload(d)?; Ok(Message::Refuse(reason)) } - x => Err(Box::new(MachineError::BadLabel(x))), + x => Err(Box::new(CodecError::BadLabel(x))), } } } diff --git a/pallas-handshake/src/n2n.rs b/pallas-handshake/src/n2n.rs index 10b8f5a..59cdc3a 100644 --- a/pallas-handshake/src/n2n.rs +++ b/pallas-handshake/src/n2n.rs @@ -1,10 +1,7 @@ use core::panic; use std::collections::HashMap; -use pallas_machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, - PayloadEncoder, -}; +use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder}; use crate::common::{RefuseReason, VersionNumber}; @@ -123,7 +120,7 @@ impl DecodePayload for Message { let reason = RefuseReason::decode_payload(d)?; Ok(Message::Refuse(reason)) } - x => Err(Box::new(MachineError::BadLabel(x))), + x => Err(Box::new(CodecError::BadLabel(x))), } } } diff --git a/pallas-machines/src/lib.rs b/pallas-machines/src/lib.rs index 516db16..6de0b75 100644 --- a/pallas-machines/src/lib.rs +++ b/pallas-machines/src/lib.rs @@ -1,34 +1,64 @@ use log::{debug, trace, warn}; -use minicbor::{Decoder, Encode, Encoder}; +use minicbor::{Decoder, Encoder}; use pallas_multiplexer::Payload; use std::borrow::Borrow; use std::fmt::{Debug, Display}; use std::sync::mpsc::{Receiver, Sender}; #[derive(Debug)] -pub enum MachineError { - BadLabel(u16), - UnexpectedCbor(&'static str), - InvalidMsgForState, +pub enum MachineError +where + State: Debug, + Msg: Debug, +{ + InvalidMsgForState(State, Msg), } -impl Display for MachineError { +impl Display for MachineError +where + S: Debug, + M: Debug, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - MachineError::BadLabel(label) => { - write!(f, "unknown message label: {}", label) - } - MachineError::UnexpectedCbor(msg) => { - write!(f, "unexpected cbor: {}", msg) - } - MachineError::InvalidMsgForState => { - write!(f, "received invalid message for current state") + MachineError::InvalidMsgForState(msg, state) => { + write!( + f, + "received invalid message ({:?}) for current state ({:?})", + msg, state + ) } } } } -impl std::error::Error for MachineError {} +impl std::error::Error for MachineError +where + S: Debug, + M: Debug, +{ +} + +#[derive(Debug)] +pub enum CodecError { + BadLabel(u16), + UnexpectedCbor(&'static str), +} + +impl std::error::Error for CodecError {} + +impl Display for CodecError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CodecError::BadLabel(label) => { + write!(f, "unknown message label: {}", label) + } + CodecError::UnexpectedCbor(msg) => { + write!(f, "unexpected cbor: {}", msg) + } + } + } +} pub type PayloadEncoder<'a> = Encoder<&'a mut Vec>; @@ -46,7 +76,6 @@ pub fn to_payload(data: &dyn EncodePayload) -> Result EncodePayload for Vec where D: EncodePayload, @@ -67,7 +96,7 @@ where D: DecodePayload, { fn decode_payload(d: &mut PayloadDecoder) -> Result> { - let len = d.array()?.ok_or(MachineError::UnexpectedCbor( + let len = d.array()?.ok_or(CodecError::UnexpectedCbor( "expecting definite-length array", ))? as usize; @@ -78,7 +107,6 @@ where } Ok(output) - } } diff --git a/pallas-txsubmission/src/lib.rs b/pallas-txsubmission/src/lib.rs index b12824e..0d6203b 100644 --- a/pallas-txsubmission/src/lib.rs +++ b/pallas-txsubmission/src/lib.rs @@ -3,10 +3,7 @@ use std::fmt::Debug; use itertools::Itertools; use log::debug; -use pallas_machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, - PayloadEncoder, Transition, -}; +use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, Transition}; #[derive(Debug, PartialEq, Clone)] pub enum State { @@ -134,7 +131,7 @@ impl DecodePayload for Message { todo!() } 4 => Ok(Message::Done), - x => Err(Box::new(MachineError::BadLabel(x))), + x => Err(Box::new(CodecError::BadLabel(x))), } } } @@ -294,7 +291,7 @@ impl Agent for NaiveProvider { ..self }), (State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids), - _ => Err(Box::new(MachineError::InvalidMsgForState)), + (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), } } }