diff --git a/pallas-blockfetch/src/lib.rs b/pallas-blockfetch/src/lib.rs index 787b59c..0ba8d70 100644 --- a/pallas-blockfetch/src/lib.rs +++ b/pallas-blockfetch/src/lib.rs @@ -1,5 +1,10 @@ +use std::sync::mpsc::Receiver; + use log::info; -use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, primitives::Point}; +use pallas_machines::{ + primitives::Point, Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, + PayloadDecoder, PayloadEncoder, Transition, +}; #[derive(Debug, PartialEq, Clone)] pub enum State { @@ -20,10 +25,7 @@ pub enum Message { } impl EncodePayload for Message { - fn encode_payload( - &self, - e: &mut PayloadEncoder, - ) -> Result<(), Box> { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { match self { Message::RequestRange { range } => { e.array(3)?.u16(0)?; @@ -57,9 +59,7 @@ impl EncodePayload for Message { } impl DecodePayload for Message { - fn decode_payload( - d: &mut PayloadDecoder, - ) -> Result> { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { d.array()?; let label = d.u16()?; @@ -87,17 +87,30 @@ impl DecodePayload for Message { } } -#[derive(Debug)] -pub struct BlockFetchClient { - pub state: State, - pub range: (Point, Point), +pub trait Observer { + fn on_block(&self, body: Vec) -> Result<(), Box> { + log::debug!("block fetched {:?}", body); + Ok(()) + } } -impl BlockFetchClient { - pub fn initial(range: (Point, Point)) -> Self { +pub struct NoopObserver {} + +impl Observer for NoopObserver {} + +#[derive(Debug)] +pub struct BatchClient where O: Observer { + pub state: State, + pub range: (Point, Point), + pub observer: O, +} + +impl BatchClient where O: Observer { + pub fn initial(range: (Point, Point), observer: O) -> Self { Self { state: State::Idle, range, + observer, } } @@ -115,7 +128,7 @@ impl BlockFetchClient { } } -impl Agent for BlockFetchClient { +impl Agent for BatchClient where O: Observer { type Message = Message; fn is_done(&self) -> bool { @@ -150,6 +163,7 @@ impl Agent for BlockFetchClient { }), (State::Streaming, Message::Block { body }) => { info!("received block body of size {}", body.len()); + self.observer.on_block(body)?; Ok(self) } (State::Streaming, Message::BatchDone) => Ok(Self { @@ -160,3 +174,84 @@ impl Agent for BlockFetchClient { } } } + +#[derive(Debug)] +pub struct OnDemandClient where O: Observer { + pub state: State, + pub requests: Receiver, + pub observer: O, +} + +impl OnDemandClient where O: Observer { + pub fn initial(requests: Receiver, observer: O) -> Self { + Self { + state: State::Idle, + requests, + observer, + } + } + + fn wait_for_request_and_send(self, tx: &impl MachineOutput) -> Transition { + let point = self.requests.recv()?; + + let msg = Message::RequestRange { + range: (point.clone(), point), + }; + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Busy, + ..self + }) + } +} + +impl Agent for OnDemandClient where O: Observer { + type Message = Message; + + // we're never done because we react to external work requests. + // TODO: see if we can inspect mpsc channel status and stop if disconnected + fn is_done(&self) -> bool { + false + } + + fn has_agency(&self) -> bool { + match self.state { + State::Idle => true, + State::Busy => false, + State::Streaming => false, + State::Done => false, + } + } + + fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.state { + State::Idle => self.wait_for_request_and_send(tx), + _ => panic!("I don't have agency, don't know what to do"), + } + } + + fn receive_next(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::Busy, Message::StartBatch) => Ok(Self { + state: State::Streaming, + ..self + }), + (State::Busy, Message::NoBlocks) => Ok(Self { + state: State::Idle, + ..self + }), + (State::Streaming, Message::Block { body }) => { + info!("received block body of size {}", body.len()); + self.observer.on_block(body)?; + Ok(self) + } + (State::Streaming, Message::BatchDone) => Ok(Self { + state: State::Idle, + ..self + }), + _ => panic!("I have agency, I don't expect messages"), + } + } +}