use std::fmt::Debug; use log::{debug, log_enabled, trace}; use pallas_machines::{ primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition, }; use crate::{Message, State, Tip}; /// A trait to deal with polymorphic payloads in the ChainSync protocol /// (WrappedHeader vs BlockBody) pub trait BlockLike: EncodePayload + DecodePayload + Debug { fn block_point(&self) -> Result>; } /// An observer of chain-sync events sent by the state-machine pub trait Observer where C: Debug, { fn on_block( &self, cursor: &Option, content: &C, ) -> Result<(), Box> { log::debug!( "asked to save block content {:?} at cursor {:?}", content, cursor ); Ok(()) } fn on_intersect_found( &self, point: &Point, tip: &Tip, ) -> Result<(), Box> { log::debug!("intersect was found {:?} (tip: {:?})", point, tip); Ok(()) } fn on_rollback(&self, point: &Point) -> Result<(), Box> { log::debug!("asked to roll back {:?}", point); Ok(()) } fn on_tip_reached(&self) -> Result<(), Box> { log::debug!("tip was reached"); Ok(()) } } #[derive(Debug)] pub struct NoopObserver {} impl Observer for NoopObserver where C: Debug {} #[derive(Debug)] pub struct Consumer where O: Observer, C: Debug, { pub state: State, pub known_points: Vec, pub cursor: Option, pub tip: Option, observer: O, // as recommended here: https://doc.rust-lang.org/error-index.html#E0207 _phantom: Option, } impl Consumer where C: BlockLike + EncodePayload + DecodePayload + Debug, O: Observer, { pub fn initial(known_points: Vec, observer: O) -> Self { Self { state: State::Idle, cursor: None, tip: None, known_points, observer, _phantom: None, } } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { let msg = Message::::FindIntersect(self.known_points.clone()); tx.send_msg(&msg)?; Ok(Self { state: State::Intersect, ..self }) } fn send_request_next(self, tx: &impl MachineOutput) -> Transition { let msg = Message::::RequestNext; tx.send_msg(&msg)?; Ok(Self { state: State::CanAwait, ..self }) } fn on_intersect_found(self, point: Point, tip: Tip) -> Transition { debug!("intersect found: {:?} (tip: {:?})", point, tip); self.observer.on_intersect_found(&point, &tip)?; Ok(Self { tip: Some(tip), cursor: Some(point), state: State::Idle, ..self }) } fn on_intersect_not_found(self, tip: Tip) -> Transition { debug!("intersect not found (tip: {:?})", tip); Ok(Self { tip: Some(tip), cursor: None, state: State::Idle, ..self }) } fn on_roll_forward(self, content: C, tip: Tip) -> Transition { debug!("rolling forward"); let point = content.block_point()?; if log_enabled!(log::Level::Trace) { trace!("content: {:?}", content); } debug!("reporint block to observer"); self.observer.on_block(&self.cursor, &content)?; Ok(Self { cursor: Some(point), tip: Some(tip), state: State::Idle, ..self }) } fn on_roll_backward(self, point: Point, tip: Tip) -> Transition { debug!("rolling backward to point: {:?}", point); debug!("reporting rollback to observer"); self.observer.on_rollback(&point)?; Ok(Self { tip: Some(tip), cursor: Some(point), state: State::Idle, ..self }) } fn on_await_reply(self) -> Transition { debug!("reached tip, await reply"); debug!("reporting tip to observer"); self.observer.on_tip_reached()?; Ok(Self { state: State::MustReply, ..self }) } } impl Agent for Consumer where C: BlockLike + EncodePayload + DecodePayload + Debug + 'static, O: Observer, { type Message = Message; fn is_done(&self) -> bool { self.state == State::Done } fn has_agency(&self) -> bool { match self.state { State::Idle => true, State::CanAwait => false, State::MustReply => false, State::Intersect => false, State::Done => false, } } fn send_next(self, tx: &impl MachineOutput) -> Transition { match self.state { State::Idle => match self.cursor { Some(_) => self.send_request_next(tx), None => self.send_find_intersect(tx), }, _ => panic!("I don't have agency, don't know what to do"), } } fn receive_next(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::CanAwait, Message::RollForward(header, tip)) => { self.on_roll_forward(header, tip) } (State::CanAwait, Message::RollBackward(point, tip)) => { self.on_roll_backward(point, tip) } (State::CanAwait, Message::AwaitReply) => self.on_await_reply(), (State::MustReply, Message::RollForward(header, tip)) => { self.on_roll_forward(header, tip) } (State::MustReply, Message::RollBackward(point, tip)) => { self.on_roll_backward(point, tip) } (State::Intersect, Message::IntersectFound(point, tip)) => { self.on_intersect_found(point, tip) } (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), } } } #[derive(Debug)] pub struct TipFinder { pub state: State, pub wellknown_point: Point, pub output: Option, } impl TipFinder { pub fn initial(wellknown_point: Point) -> Self { TipFinder { wellknown_point, output: None, state: State::Idle, } } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); tx.send_msg(&msg)?; Ok(Self { state: State::Intersect, ..self }) } fn on_intersect_found(self, tip: Tip) -> Transition { debug!("intersect found with tip: {:?}", tip); Ok(Self { state: State::Done, output: Some(tip), ..self }) } fn on_intersect_not_found(self, tip: Tip) -> Transition { debug!("intersect not found but still have a tip: {:?}", tip); Ok(Self { state: State::Done, output: Some(tip), ..self }) } } #[derive(Debug)] pub struct NoopContent {} impl EncodePayload for NoopContent { fn encode_payload( &self, _e: &mut pallas_machines::PayloadEncoder, ) -> Result<(), Box> { todo!() } } impl DecodePayload for NoopContent { fn decode_payload( _d: &mut pallas_machines::PayloadDecoder, ) -> Result> { todo!() } } impl BlockLike for NoopContent { fn block_point(&self) -> Result> { todo!() } } impl Agent for TipFinder { type Message = Message; fn is_done(&self) -> bool { self.state == State::Done } fn has_agency(&self) -> bool { match self.state { State::Idle => true, State::CanAwait => false, State::MustReply => false, State::Intersect => false, State::Done => false, } } fn send_next(self, tx: &impl MachineOutput) -> Transition { match self.state { State::Idle => self.send_find_intersect(tx), _ => panic!("I don't have agency, don't know what to do"), } } fn receive_next(self, msg: Self::Message) -> Transition { match (&self.state, msg) { (State::Intersect, Message::IntersectFound(_point, tip)) => { self.on_intersect_found(tip) } (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()), } } }