diff --git a/pallas-miniprotocols/src/blockfetch/mod.rs b/pallas-miniprotocols/src/blockfetch/mod.rs index b69d9ce..dfe305e 100644 --- a/pallas-miniprotocols/src/blockfetch/mod.rs +++ b/pallas-miniprotocols/src/blockfetch/mod.rs @@ -1,5 +1,3 @@ -use std::sync::mpsc::Receiver; - use crate::machines::{Agent, MachineOutput, Transition}; use crate::common::Point; @@ -206,20 +204,22 @@ where } #[derive(Debug)] -pub struct OnDemandClient +pub struct OnDemandClient where + I: Iterator, O: Observer, { pub state: State, - pub requests: Receiver, + pub requests: I, pub observer: O, } -impl OnDemandClient +impl OnDemandClient where + I: Iterator, O: Observer, { - pub fn initial(requests: Receiver, observer: O) -> Self { + pub fn initial(requests: I, observer: O) -> Self { Self { state: State::Idle, requests, @@ -227,8 +227,8 @@ where } } - fn wait_for_request_and_send(self, tx: &impl MachineOutput) -> Transition { - let point = self.requests.recv()?; + fn send_request_range(self, tx: &impl MachineOutput, point: Point) -> Transition { + log::debug!("requesting block {:?}", point); let msg = Message::RequestRange { range: (point.clone(), point), @@ -242,6 +242,24 @@ where }) } + fn dropout(self) -> Transition { + log::debug!("dropping out, channel will remain open"); + + Ok(Self { + state: State::Done, + ..self + }) + } + + fn wait_for_request_and_send(mut self, tx: &impl MachineOutput) -> Transition { + let point = self.requests.next(); + + match point { + Some(x) => self.send_request_range(tx, x), + None => self.dropout(), + } + } + fn on_block(self, body: Vec) -> Transition { log::debug!("received block body, size {}", body.len()); @@ -251,16 +269,15 @@ where } } -impl Agent for OnDemandClient +impl Agent for OnDemandClient where + I: Iterator, O: Observer, { type Message = Message; - // we're never done because we react to external work requests. - // TODO: see if we can inspect mpsc channel status and stop if disconnected fn is_done(&self) -> bool { - false + self.state == State::Done } fn has_agency(&self) -> bool { diff --git a/pallas-miniprotocols/src/chainsync/clients.rs b/pallas-miniprotocols/src/chainsync/clients.rs index a17b74c..fb35618 100644 --- a/pallas-miniprotocols/src/chainsync/clients.rs +++ b/pallas-miniprotocols/src/chainsync/clients.rs @@ -9,34 +9,44 @@ use crate::common::Point; use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip}; +#[derive(Debug)] +pub enum Continuation { + Proceed, + DropOut, + Done, +} + /// An observer of chain-sync events sent by the state-machine pub trait Observer { fn on_roll_forward( &mut self, _content: C, tip: &Tip, - ) -> Result<(), Box> { + ) -> Result> { log::debug!("asked to roll forward, tip at {:?}", tip); - Ok(()) + Ok(Continuation::Proceed) } fn on_intersect_found( &mut self, point: &Point, tip: &Tip, - ) -> Result<(), Box> { + ) -> Result> { log::debug!("intersect was found {:?} (tip: {:?})", point, tip); - Ok(()) + + Ok(Continuation::Proceed) } - fn on_rollback(&mut self, point: &Point) -> Result<(), Box> { + fn on_rollback(&mut self, point: &Point) -> Result> { log::debug!("asked to roll back {:?}", point); - Ok(()) + + Ok(Continuation::Proceed) } - fn on_tip_reached(&mut self) -> Result<(), Box> { + fn on_tip_reached(&mut self) -> Result> { log::debug!("tip was reached"); - Ok(()) + + Ok(Continuation::Proceed) } } @@ -55,6 +65,8 @@ where pub intersect: Option, pub tip: Option, + continuation: Continuation, + observer: O, _phantom: PhantomData, @@ -71,6 +83,7 @@ where intersect: None, tip: None, known_points, + continuation: Continuation::Proceed, observer, _phantom: PhantomData::default(), } @@ -107,15 +120,38 @@ where }) } + fn send_done(self, tx: &impl MachineOutput) -> Transition { + log::debug!("notifying done"); + + let msg = Message::::Done; + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Done, + ..self + }) + } + + fn dropout(self) -> Transition { + log::debug!("dropping out, channel will keep open"); + + Ok(Self { + state: State::Done, + ..self + }) + } + fn on_intersect_found(mut self, point: Point, tip: Tip) -> Transition { log::debug!("intersect found: {:?} (tip: {:?})", point, tip); - self.observer.on_intersect_found(&point, &tip)?; + let continuation = self.observer.on_intersect_found(&point, &tip)?; Ok(Self { tip: Some(tip), intersect: Some(point), state: State::Idle, + continuation, ..self }) } @@ -134,11 +170,12 @@ where fn on_roll_forward(mut self, content: C, tip: Tip) -> Transition { log::debug!("rolling forward"); - self.observer.on_roll_forward(content, &tip)?; + let continuation = self.observer.on_roll_forward(content, &tip)?; Ok(Self { tip: Some(tip), state: State::Idle, + continuation, ..self }) } @@ -146,13 +183,13 @@ where fn on_roll_backward(mut self, point: Point, tip: Tip) -> Transition { log::debug!("rolling backward to point: {:?}", point); - log::debug!("reporting rollback to observer"); - self.observer.on_rollback(&point)?; + let continuation = self.observer.on_rollback(&point)?; Ok(Self { tip: Some(tip), intersect: Some(point), state: State::Idle, + continuation, ..self }) } @@ -160,10 +197,11 @@ where fn on_await_reply(mut self) -> Transition { log::debug!("reached tip, await reply"); - self.observer.on_tip_reached()?; + let continuation = self.observer.on_tip_reached()?; Ok(Self { state: State::MustReply, + continuation, ..self }) } @@ -192,6 +230,12 @@ where } fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.continuation { + Continuation::Done => return self.send_done(tx), + Continuation::DropOut => return self.dropout(), + _ => (), + }; + match self.state { State::Idle => match self.intersect { // keep going from pointer