feat(miniprotocols): Allow graceful exit on chainsync and blockfetch (#83)

This commit is contained in:
Santiago Carmuega 2022-03-22 08:26:46 -03:00 committed by GitHub
parent 6eb48b7469
commit 9d33defe32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 25 deletions

View file

@ -1,5 +1,3 @@
use std::sync::mpsc::Receiver;
use crate::machines::{Agent, MachineOutput, Transition};
use crate::common::Point;
@ -206,20 +204,22 @@ where
}
#[derive(Debug)]
pub struct OnDemandClient<O>
pub struct OnDemandClient<I, O>
where
I: Iterator<Item = Point>,
O: Observer,
{
pub state: State,
pub requests: Receiver<Point>,
pub requests: I,
pub observer: O,
}
impl<O> OnDemandClient<O>
impl<I, O> OnDemandClient<I, O>
where
I: Iterator<Item = Point>,
O: Observer,
{
pub fn initial(requests: Receiver<Point>, observer: O) -> Self {
pub fn initial(requests: I, observer: O) -> Self {
Self {
state: State::Idle,
requests,
@ -227,8 +227,8 @@ where
}
}
fn wait_for_request_and_send(self, tx: &impl MachineOutput) -> Transition<Self> {
let point = self.requests.recv()?;
fn send_request_range(self, tx: &impl MachineOutput, point: Point) -> Transition<Self> {
log::debug!("requesting block {:?}", point);
let msg = Message::RequestRange {
range: (point.clone(), point),
@ -242,6 +242,24 @@ where
})
}
fn dropout(self) -> Transition<Self> {
log::debug!("dropping out, channel will remain open");
Ok(Self {
state: State::Done,
..self
})
}
fn wait_for_request_and_send(mut self, tx: &impl MachineOutput) -> Transition<Self> {
let point = self.requests.next();
match point {
Some(x) => self.send_request_range(tx, x),
None => self.dropout(),
}
}
fn on_block(self, body: Vec<u8>) -> Transition<Self> {
log::debug!("received block body, size {}", body.len());
@ -251,16 +269,15 @@ where
}
}
impl<O> Agent for OnDemandClient<O>
impl<I, O> Agent for OnDemandClient<I, O>
where
I: Iterator<Item = Point>,
O: Observer,
{
type Message = Message;
// we're never done because we react to external work requests.
// TODO: see if we can inspect mpsc channel status and stop if disconnected
fn is_done(&self) -> bool {
false
self.state == State::Done
}
fn has_agency(&self) -> bool {

View file

@ -9,34 +9,44 @@ use crate::common::Point;
use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip};
#[derive(Debug)]
pub enum Continuation {
Proceed,
DropOut,
Done,
}
/// An observer of chain-sync events sent by the state-machine
pub trait Observer<C> {
fn on_roll_forward(
&mut self,
_content: C,
tip: &Tip,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);
Ok(())
Ok(Continuation::Proceed)
}
fn on_intersect_found(
&mut self,
point: &Point,
tip: &Tip,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<Continuation, Box<dyn std::error::Error>> {
log::debug!("intersect was found {:?} (tip: {:?})", point, tip);
Ok(())
Ok(Continuation::Proceed)
}
fn on_rollback(&mut self, point: &Point) -> Result<(), Box<dyn std::error::Error>> {
fn on_rollback(&mut self, point: &Point) -> Result<Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll back {:?}", point);
Ok(())
Ok(Continuation::Proceed)
}
fn on_tip_reached(&mut self) -> Result<(), Box<dyn std::error::Error>> {
fn on_tip_reached(&mut self) -> Result<Continuation, Box<dyn std::error::Error>> {
log::debug!("tip was reached");
Ok(())
Ok(Continuation::Proceed)
}
}
@ -55,6 +65,8 @@ where
pub intersect: Option<Point>,
pub tip: Option<Tip>,
continuation: Continuation,
observer: O,
_phantom: PhantomData<C>,
@ -71,6 +83,7 @@ where
intersect: None,
tip: None,
known_points,
continuation: Continuation::Proceed,
observer,
_phantom: PhantomData::default(),
}
@ -107,15 +120,38 @@ where
})
}
fn send_done(self, tx: &impl MachineOutput) -> Transition<Self> {
log::debug!("notifying done");
let msg = Message::<C>::Done;
tx.send_msg(&msg)?;
Ok(Self {
state: State::Done,
..self
})
}
fn dropout(self) -> Transition<Self> {
log::debug!("dropping out, channel will keep open");
Ok(Self {
state: State::Done,
..self
})
}
fn on_intersect_found(mut self, point: Point, tip: Tip) -> Transition<Self> {
log::debug!("intersect found: {:?} (tip: {:?})", point, tip);
self.observer.on_intersect_found(&point, &tip)?;
let continuation = self.observer.on_intersect_found(&point, &tip)?;
Ok(Self {
tip: Some(tip),
intersect: Some(point),
state: State::Idle,
continuation,
..self
})
}
@ -134,11 +170,12 @@ where
fn on_roll_forward(mut self, content: C, tip: Tip) -> Transition<Self> {
log::debug!("rolling forward");
self.observer.on_roll_forward(content, &tip)?;
let continuation = self.observer.on_roll_forward(content, &tip)?;
Ok(Self {
tip: Some(tip),
state: State::Idle,
continuation,
..self
})
}
@ -146,13 +183,13 @@ where
fn on_roll_backward(mut self, point: Point, tip: Tip) -> Transition<Self> {
log::debug!("rolling backward to point: {:?}", point);
log::debug!("reporting rollback to observer");
self.observer.on_rollback(&point)?;
let continuation = self.observer.on_rollback(&point)?;
Ok(Self {
tip: Some(tip),
intersect: Some(point),
state: State::Idle,
continuation,
..self
})
}
@ -160,10 +197,11 @@ where
fn on_await_reply(mut self) -> Transition<Self> {
log::debug!("reached tip, await reply");
self.observer.on_tip_reached()?;
let continuation = self.observer.on_tip_reached()?;
Ok(Self {
state: State::MustReply,
continuation,
..self
})
}
@ -192,6 +230,12 @@ where
}
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.continuation {
Continuation::Done => return self.send_done(tx),
Continuation::DropOut => return self.dropout(),
_ => (),
};
match self.state {
State::Idle => match self.intersect {
// keep going from pointer