diff --git a/pallas-chainsync/src/clients.rs b/pallas-chainsync/src/clients.rs new file mode 100644 index 0000000..e439e8b --- /dev/null +++ b/pallas-chainsync/src/clients.rs @@ -0,0 +1,306 @@ +use std::fmt::Debug; + +use log::{debug, log_enabled, trace}; + +use pallas_machines::{ + primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition, +}; + +use crate::{BlockBody, Message, State, Tip, WrappedHeader}; + +/// An observer of chain-sync events sent by the state-machine +pub trait Observer where C: Debug { + fn on_block(&self, content: &C) -> Result<(), Box> { + log::debug!("asked to save block {:?}", content); + Ok(()) + } + + fn on_intersect_found( + &self, + point: &Point, + tip: &Tip, + ) -> Result<(), Box> { + log::debug!("intersect was found {:?} (tip: {:?})", point, tip); + Ok(()) + } + + fn on_rollback(&self, point: &Point) -> Result<(), Box> { + log::debug!("asked to roll back {:?}", point); + Ok(()) + } + fn on_tip_reached(&self) -> Result<(), Box> { + log::debug!("tip was reached"); + Ok(()) + } +} + +#[derive(Debug)] +pub struct NoopObserver {} + +impl Observer for NoopObserver where C: Debug {} + +#[derive(Debug)] +pub struct Consumer +where + O: Observer, + C: Debug, +{ + pub state: State, + pub known_points: Vec, + pub cursor: Option, + pub tip: Option, + + observer: O, + + // as recommended here: https://doc.rust-lang.org/error-index.html#E0207 + _phantom: Option, +} + +impl Consumer +where + C: EncodePayload + DecodePayload + Debug, + O: Observer, +{ + pub fn initial(known_points: Vec, observer: O) -> Self { + Self { + state: State::Idle, + cursor: None, + tip: None, + known_points, + observer, + + _phantom: None, + } + } + + fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::::FindIntersect(self.known_points.clone()); + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Intersect, + ..self + }) + } + + fn send_request_next(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::::RequestNext; + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::CanAwait, + ..self + }) + } + + fn on_intersect_found(self, point: Point, tip: Tip) -> Transition { + debug!("intersect found: {:?} (tip: {:?})", point, tip); + + self.observer.on_intersect_found(&point, &tip)?; + + Ok(Self { + tip: Some(tip), + cursor: Some(point), + state: State::Idle, + ..self + }) + } + + fn on_intersect_not_found(self, tip: Tip) -> Transition { + debug!("intersect not found (tip: {:?})", tip); + + Ok(Self { + tip: Some(tip), + cursor: None, + state: State::Idle, + ..self + }) + } + + fn on_roll_forward(self, content: C, tip: Tip) -> Transition { + debug!("rolling forward"); + + if log_enabled!(log::Level::Trace) { + trace!("content: {:?}", content); + } + + debug!("reporint block to observer"); + self.observer.on_block(&content)?; + + Ok(Self { + tip: Some(tip), + state: State::Idle, + ..self + }) + } + + fn on_roll_backward(self, point: Point, tip: Tip) -> Transition { + debug!("rolling backward to point: {:?}", point); + + debug!("reporting rollback to observer"); + self.observer.on_rollback(&point)?; + + Ok(Self { + tip: Some(tip), + cursor: Some(point), + state: State::Idle, + ..self + }) + } + + fn on_await_reply(self) -> Transition { + debug!("reached tip, await reply"); + + debug!("reporting tip to observer"); + self.observer.on_tip_reached()?; + + Ok(Self { + state: State::MustReply, + ..self + }) + } +} + +impl Agent for Consumer +where + C: EncodePayload + DecodePayload + Debug + 'static, + O: Observer, +{ + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Idle => true, + State::CanAwait => false, + State::MustReply => false, + State::Intersect => false, + State::Done => false, + } + } + + fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.state { + State::Idle => match self.cursor { + Some(_) => self.send_request_next(tx), + None => self.send_find_intersect(tx), + }, + _ => panic!("I don't have agency, don't know what to do"), + } + } + + fn receive_next(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::CanAwait, Message::RollForward(header, tip)) => { + self.on_roll_forward(header, tip) + } + (State::CanAwait, Message::RollBackward(point, tip)) => { + self.on_roll_backward(point, tip) + } + (State::CanAwait, Message::AwaitReply) => self.on_await_reply(), + (State::MustReply, Message::RollForward(header, tip)) => { + self.on_roll_forward(header, tip) + } + (State::MustReply, Message::RollBackward(point, tip)) => { + self.on_roll_backward(point, tip) + } + (State::Intersect, Message::IntersectFound(point, tip)) => { + self.on_intersect_found(point, tip) + } + (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), + (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), + } + } +} + +pub type NodeConsumer = Consumer; + +pub type ClientConsumer = Consumer; + +#[derive(Debug)] +pub struct TipFinder { + pub state: State, + pub wellknown_point: Point, + pub output: Option, +} + +impl TipFinder { + pub fn initial(wellknown_point: Point) -> Self { + TipFinder { + wellknown_point, + output: None, + state: State::Idle, + } + } + + fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Intersect, + ..self + }) + } + + fn on_intersect_found(self, tip: Tip) -> Transition { + debug!("intersect found with tip: {:?}", tip); + + Ok(Self { + state: State::Done, + output: Some(tip), + ..self + }) + } + + fn on_intersect_not_found(self, tip: Tip) -> Transition { + debug!("intersect not found but still have a tip: {:?}", tip); + + Ok(Self { + state: State::Done, + output: Some(tip), + ..self + }) + } +} + +impl Agent for TipFinder { + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Idle => true, + State::CanAwait => false, + State::MustReply => false, + State::Intersect => false, + State::Done => false, + } + } + + fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.state { + State::Idle => self.send_find_intersect(tx), + _ => panic!("I don't have agency, don't know what to do"), + } + } + + fn receive_next(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::Intersect, Message::IntersectFound(_point, tip)) => { + self.on_intersect_found(tip) + } + (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), + (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), + } + } +} diff --git a/pallas-chainsync/src/codec.rs b/pallas-chainsync/src/codec.rs new file mode 100644 index 0000000..516b430 --- /dev/null +++ b/pallas-chainsync/src/codec.rs @@ -0,0 +1,157 @@ +use minicbor::data::Tag; + +use pallas_machines::{ + primitives::Point, CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, +}; + +use crate::{BlockBody, Message, Tip, WrappedHeader}; + +impl EncodePayload for WrappedHeader { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?; + e.u64(self.0)?; + e.tag(Tag::Cbor)?; + e.bytes(&self.1)?; + + Ok(()) + } +} + +impl DecodePayload for WrappedHeader { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let unknown = d.u64()?; // WTF is this value? + d.tag()?; + let bytes = Vec::from(d.bytes()?); + + Ok(WrappedHeader(unknown, bytes)) + } +} + +impl EncodePayload for BlockBody { + fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { + todo!() + } +} + +impl DecodePayload for BlockBody { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.tag()?; + let bytes = Vec::from(d.bytes()?); + + Ok(BlockBody(bytes)) + } +} + +impl EncodePayload for Tip { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?; + self.0.encode_payload(e)?; + e.u64(self.1)?; + + Ok(()) + } +} + +impl DecodePayload for Tip { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let point = Point::decode_payload(d)?; + let block_num = d.u64()?; + + Ok(Tip(point, block_num)) + } +} + +impl EncodePayload for Message +where + C: EncodePayload + DecodePayload, +{ + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + match self { + Message::RequestNext => { + e.array(1)?.u16(0)?; + Ok(()) + } + Message::AwaitReply => { + e.array(1)?.u16(1)?; + Ok(()) + } + Message::RollForward(header, tip) => { + e.array(3)?.u16(2)?; + header.encode_payload(e)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::RollBackward(point, tip) => { + e.array(3)?.u16(3)?; + point.encode_payload(e)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::FindIntersect(points) => { + e.array(2)?.u16(4)?; + e.array(points.len() as u64)?; + for point in points.iter() { + point.encode_payload(e)?; + } + Ok(()) + } + Message::IntersectFound(point, tip) => { + e.array(3)?.u16(5)?; + point.encode_payload(e)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::IntersectNotFound(tip) => { + e.array(1)?.u16(6)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::Done => { + e.array(1)?.u16(7)?; + Ok(()) + } + } + } +} + +impl DecodePayload for Message +where + C: EncodePayload + DecodePayload, +{ + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let label = d.u16()?; + + match label { + 0 => Ok(Message::RequestNext), + 1 => Ok(Message::AwaitReply), + 2 => { + let content = C::decode_payload(d)?; + let tip = Tip::decode_payload(d)?; + Ok(Message::RollForward(content, tip)) + } + 3 => { + let point = Point::decode_payload(d)?; + let tip = Tip::decode_payload(d)?; + Ok(Message::RollBackward(point, tip)) + } + 4 => { + let points = Vec::::decode_payload(d)?; + Ok(Message::FindIntersect(points)) + } + 5 => { + let point = Point::decode_payload(d)?; + let tip = Tip::decode_payload(d)?; + Ok(Message::IntersectFound(point, tip)) + } + 6 => { + let tip = Tip::decode_payload(d)?; + Ok(Message::IntersectNotFound(tip)) + } + 7 => Ok(Message::Done), + x => Err(Box::new(CodecError::BadLabel(x))), + } + } +} diff --git a/pallas-chainsync/src/lib.rs b/pallas-chainsync/src/lib.rs index e5da871..81be990 100644 --- a/pallas-chainsync/src/lib.rs +++ b/pallas-chainsync/src/lib.rs @@ -1,391 +1,7 @@ -use std::fmt::Debug; +mod clients; +mod codec; +mod protocol; -use log::{debug, log_enabled, trace}; - -use minicbor::data::Tag; -use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, primitives::Point}; - -#[derive(Debug)] -pub struct WrappedHeader(u64, Vec); - -impl EncodePayload for WrappedHeader { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { - e.array(2)?; - e.u64(self.0)?; - e.tag(Tag::Cbor)?; - e.bytes(&self.1)?; - - Ok(()) - } -} - -impl DecodePayload for WrappedHeader { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.array()?; - let unknown = d.u64()?; // WTF is this value? - d.tag()?; - let bytes = Vec::from(d.bytes()?); - - Ok(WrappedHeader(unknown, bytes)) - } -} - -#[derive(Debug)] -pub struct BlockBody(pub Vec); - -impl EncodePayload for BlockBody { - fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { - todo!() - } -} - -impl DecodePayload for BlockBody { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.tag()?; - let bytes = Vec::from(d.bytes()?); - - Ok(BlockBody(bytes)) - } -} - -#[derive(Debug)] -pub struct Tip(Point, u64); - -impl EncodePayload for Tip { - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { - e.array(2)?; - self.0.encode_payload(e)?; - e.u64(self.1)?; - - Ok(()) - } -} - -impl DecodePayload for Tip { - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.array()?; - let point = Point::decode_payload(d)?; - let block_num = d.u64()?; - - Ok(Tip(point, block_num)) - } -} - -#[derive(Debug, PartialEq, Clone)] -pub enum State { - Idle, - CanAwait, - MustReply, - Intersect, - Done, -} - -/// A generic chain-sync message for either header or block content -#[derive(Debug)] -pub enum Message -where - C: EncodePayload + DecodePayload + Sized, -{ - RequestNext, - AwaitReply, - RollForward(C, Tip), - RollBackward(Point, Tip), - FindIntersect(Vec), - IntersectFound(Point, Tip), - IntersectNotFound(Tip), - Done, -} - -impl EncodePayload for Message -where - C: EncodePayload + DecodePayload, -{ - fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { - match self { - Message::RequestNext => { - e.array(1)?.u16(0)?; - Ok(()) - } - Message::AwaitReply => { - e.array(1)?.u16(1)?; - Ok(()) - } - Message::RollForward(header, tip) => { - e.array(3)?.u16(2)?; - header.encode_payload(e)?; - tip.encode_payload(e)?; - Ok(()) - } - Message::RollBackward(point, tip) => { - e.array(3)?.u16(3)?; - point.encode_payload(e)?; - tip.encode_payload(e)?; - Ok(()) - } - Message::FindIntersect(points) => { - e.array(2)?.u16(4)?; - e.array(points.len() as u64)?; - for point in points.iter() { - point.encode_payload(e)?; - } - Ok(()) - } - Message::IntersectFound(point, tip) => { - e.array(3)?.u16(5)?; - point.encode_payload(e)?; - tip.encode_payload(e)?; - Ok(()) - } - Message::IntersectNotFound(tip) => { - e.array(1)?.u16(6)?; - tip.encode_payload(e)?; - Ok(()) - } - Message::Done => { - e.array(1)?.u16(7)?; - Ok(()) - } - } - } -} - -impl DecodePayload for Message -where - C: EncodePayload + DecodePayload, -{ - fn decode_payload(d: &mut PayloadDecoder) -> Result> { - d.array()?; - let label = d.u16()?; - - match label { - 0 => Ok(Message::RequestNext), - 1 => Ok(Message::AwaitReply), - 2 => { - let content = C::decode_payload(d)?; - let tip = Tip::decode_payload(d)?; - Ok(Message::RollForward(content, tip)) - } - 3 => { - let point = Point::decode_payload(d)?; - let tip = Tip::decode_payload(d)?; - Ok(Message::RollBackward(point, tip)) - } - 4 => { - let points = Vec::::decode_payload(d)?; - Ok(Message::FindIntersect(points)) - } - 5 => { - let point = Point::decode_payload(d)?; - let tip = Tip::decode_payload(d)?; - Ok(Message::IntersectFound(point, tip)) - } - 6 => { - let tip = Tip::decode_payload(d)?; - Ok(Message::IntersectNotFound(tip)) - } - 7 => Ok(Message::Done), - x => Err(Box::new(CodecError::BadLabel(x))), - } - } -} - -/// An observer of chain-sync events sent by the state-machine -pub trait Observer { - fn on_block(&self, content: &C) -> Result<(), Box>; - fn on_rollback(&self, point: &Point) -> Result<(), Box>; -} - -#[derive(Debug)] -pub struct NoopObserver {} - -impl Observer for NoopObserver -where - C: Debug, -{ - fn on_block(&self, content: &C) -> Result<(), Box> { - log::warn!("asked to save block {:?}", content); - Ok(()) - } - - fn on_rollback(&self, point: &Point) -> Result<(), Box> { - log::warn!("asked to roll back {:?}", point); - Ok(()) - } -} - -#[derive(Debug)] -pub struct Consumer -where - O: Observer, -{ - pub state: State, - pub known_points: Vec, - pub cursor: Option, - pub tip: Option, - - observer: O, - - // as recommended here: https://doc.rust-lang.org/error-index.html#E0207 - _phantom: Option, -} - -impl Consumer -where - C: EncodePayload + DecodePayload + Debug, - O: Observer, -{ - pub fn initial(known_points: Vec, observer: O) -> Self { - Self { - state: State::Idle, - cursor: None, - tip: None, - known_points, - observer, - - _phantom: None, - } - } - - fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::FindIntersect(self.known_points.clone()); - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::Intersect, - ..self - }) - } - - fn send_request_next(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::RequestNext; - - tx.send_msg(&msg)?; - - Ok(Self { - state: State::CanAwait, - ..self - }) - } - - fn on_intersect_found(self, point: Point, tip: Tip) -> Transition { - debug!("intersect found: {:?} (tip: {:?})", point, tip); - - Ok(Self { - tip: Some(tip), - cursor: Some(point), - state: State::Idle, - ..self - }) - } - - fn on_intersect_not_found(self, tip: Tip) -> Transition { - debug!("intersect not found (tip: {:?})", tip); - - Ok(Self { - tip: Some(tip), - cursor: None, - state: State::Idle, - ..self - }) - } - - fn on_roll_forward(self, content: C, tip: Tip) -> Transition { - debug!("rolling forward"); - - if log_enabled!(log::Level::Trace) { - trace!("content: {:?}", content); - } - - debug!("reporint block to observer"); - self.observer.on_block(&content)?; - - Ok(Self { - tip: Some(tip), - state: State::Idle, - ..self - }) - } - - fn on_roll_backward(self, point: Point, tip: Tip) -> Transition { - debug!("rolling backward to point: {:?}", point); - - debug!("reporting rollback to observer"); - self.observer.on_rollback(&point)?; - - Ok(Self { - tip: Some(tip), - cursor: Some(point), - state: State::Idle, - ..self - }) - } - - fn on_await_reply(self) -> Transition { - debug!("reached tip, await reply"); - - Ok(Self { - state: State::MustReply, - ..self - }) - } -} - -impl Agent for Consumer -where - C: EncodePayload + DecodePayload + Debug + 'static, - O: Observer, -{ - type Message = Message; - - fn is_done(&self) -> bool { - self.state == State::Done - } - - fn has_agency(&self) -> bool { - match self.state { - State::Idle => true, - State::CanAwait => false, - State::MustReply => false, - State::Intersect => false, - State::Done => false, - } - } - - fn send_next(self, tx: &impl MachineOutput) -> Transition { - match self.state { - State::Idle => match self.cursor { - Some(_) => self.send_request_next(tx), - None => self.send_find_intersect(tx), - }, - _ => panic!("I don't have agency, don't know what to do"), - } - } - - fn receive_next(self, msg: Self::Message) -> Transition { - match (&self.state, msg) { - (State::CanAwait, Message::RollForward(header, tip)) => { - self.on_roll_forward(header, tip) - } - (State::CanAwait, Message::RollBackward(point, tip)) => { - self.on_roll_backward(point, tip) - } - (State::CanAwait, Message::AwaitReply) => self.on_await_reply(), - (State::MustReply, Message::RollForward(header, tip)) => { - self.on_roll_forward(header, tip) - } - (State::MustReply, Message::RollBackward(point, tip)) => { - self.on_roll_backward(point, tip) - } - (State::Intersect, Message::IntersectFound(point, tip)) => { - self.on_intersect_found(point, tip) - } - (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), - (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), - } - } -} - -pub type NodeConsumer = Consumer; - -pub type ClientConsumer = Consumer; +pub use clients::*; +pub use codec::*; +pub use protocol::*; diff --git a/pallas-chainsync/src/protocol.rs b/pallas-chainsync/src/protocol.rs new file mode 100644 index 0000000..e981c0b --- /dev/null +++ b/pallas-chainsync/src/protocol.rs @@ -0,0 +1,37 @@ +use std::fmt::Debug; + +use pallas_machines::{primitives::Point, DecodePayload, EncodePayload}; + +#[derive(Debug)] +pub struct WrappedHeader(pub u64, pub Vec); + +#[derive(Debug)] +pub struct BlockBody(pub Vec); + +#[derive(Debug)] +pub struct Tip(pub Point, pub u64); + +#[derive(Debug, PartialEq, Clone)] +pub enum State { + Idle, + CanAwait, + MustReply, + Intersect, + Done, +} + +/// A generic chain-sync message for either header or block content +#[derive(Debug)] +pub enum Message +where + C: EncodePayload + DecodePayload + Sized, +{ + RequestNext, + AwaitReply, + RollForward(C, Tip), + RollBackward(Point, Tip), + FindIntersect(Vec), + IntersectFound(Point, Tip), + IntersectNotFound(Tip), + Done, +}