feat(blockfetch): add on-demand block-fetch client

This commit is contained in:
Santiago Carmuega 2021-12-09 06:46:51 -03:00
parent 6659d2e69c
commit 61b37cce41

View file

@ -1,5 +1,10 @@
use std::sync::mpsc::Receiver;
use log::info;
use pallas_machines::{Agent, CodecError, DecodePayload, EncodePayload, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, primitives::Point};
use pallas_machines::{
primitives::Point, Agent, CodecError, DecodePayload, EncodePayload, MachineOutput,
PayloadDecoder, PayloadEncoder, Transition,
};
#[derive(Debug, PartialEq, Clone)]
pub enum State {
@ -20,10 +25,7 @@ pub enum Message {
}
impl EncodePayload for Message {
fn encode_payload(
&self,
e: &mut PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
match self {
Message::RequestRange { range } => {
e.array(3)?.u16(0)?;
@ -57,9 +59,7 @@ impl EncodePayload for Message {
}
impl DecodePayload for Message {
fn decode_payload(
d: &mut PayloadDecoder,
) -> Result<Self, Box<dyn std::error::Error>> {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let label = d.u16()?;
@ -87,17 +87,30 @@ impl DecodePayload for Message {
}
}
#[derive(Debug)]
pub struct BlockFetchClient {
pub state: State,
pub range: (Point, Point),
pub trait Observer {
fn on_block(&self, body: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
log::debug!("block fetched {:?}", body);
Ok(())
}
}
impl BlockFetchClient {
pub fn initial(range: (Point, Point)) -> Self {
pub struct NoopObserver {}
impl Observer for NoopObserver {}
#[derive(Debug)]
pub struct BatchClient<O> where O: Observer {
pub state: State,
pub range: (Point, Point),
pub observer: O,
}
impl<O> BatchClient<O> where O: Observer {
pub fn initial(range: (Point, Point), observer: O) -> Self {
Self {
state: State::Idle,
range,
observer,
}
}
@ -115,7 +128,7 @@ impl BlockFetchClient {
}
}
impl Agent for BlockFetchClient {
impl<O> Agent for BatchClient<O> where O: Observer {
type Message = Message;
fn is_done(&self) -> bool {
@ -150,6 +163,7 @@ impl Agent for BlockFetchClient {
}),
(State::Streaming, Message::Block { body }) => {
info!("received block body of size {}", body.len());
self.observer.on_block(body)?;
Ok(self)
}
(State::Streaming, Message::BatchDone) => Ok(Self {
@ -160,3 +174,84 @@ impl Agent for BlockFetchClient {
}
}
}
#[derive(Debug)]
pub struct OnDemandClient<O> where O: Observer {
pub state: State,
pub requests: Receiver<Point>,
pub observer: O,
}
impl<O> OnDemandClient<O> where O: Observer {
pub fn initial(requests: Receiver<Point>, observer: O) -> Self {
Self {
state: State::Idle,
requests,
observer,
}
}
fn wait_for_request_and_send(self, tx: &impl MachineOutput) -> Transition<Self> {
let point = self.requests.recv()?;
let msg = Message::RequestRange {
range: (point.clone(), point),
};
tx.send_msg(&msg)?;
Ok(Self {
state: State::Busy,
..self
})
}
}
impl<O> Agent for OnDemandClient<O> where O: Observer {
type Message = Message;
// we're never done because we react to external work requests.
// TODO: see if we can inspect mpsc channel status and stop if disconnected
fn is_done(&self) -> bool {
false
}
fn has_agency(&self) -> bool {
match self.state {
State::Idle => true,
State::Busy => false,
State::Streaming => false,
State::Done => false,
}
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::Idle => self.wait_for_request_and_send(tx),
_ => panic!("I don't have agency, don't know what to do"),
}
}
fn receive_next(self, msg: Self::Message) -> Transition<Self> {
match (&self.state, msg) {
(State::Busy, Message::StartBatch) => Ok(Self {
state: State::Streaming,
..self
}),
(State::Busy, Message::NoBlocks) => Ok(Self {
state: State::Idle,
..self
}),
(State::Streaming, Message::Block { body }) => {
info!("received block body of size {}", body.len());
self.observer.on_block(body)?;
Ok(self)
}
(State::Streaming, Message::BatchDone) => Ok(Self {
state: State::Idle,
..self
}),
_ => panic!("I have agency, I don't expect messages"),
}
}
}