Start using 'observer' concept in mini protocols
This commit is contained in:
parent
afd1d9e7c5
commit
54ce8a8027
4 changed files with 31 additions and 22 deletions
|
|
@ -117,7 +117,7 @@ pub type Mint = Multiasset<i64>;
|
|||
|
||||
pub type Coin = u64;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum Value {
|
||||
Coin(Coin),
|
||||
Multiasset(Coin, Multiasset<u64>),
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use pallas_chainsync::{ClientConsumer, NoopStorage};
|
||||
use pallas_chainsync::{ClientConsumer, NoopObserver};
|
||||
use pallas_handshake::n2c::{Client, VersionTable};
|
||||
use pallas_handshake::MAINNET_MAGIC;
|
||||
use pallas_machines::primitives::Point;
|
||||
|
|
@ -27,7 +27,7 @@ fn main() {
|
|||
)];
|
||||
|
||||
let cs_channel = muxer.use_channel(5);
|
||||
let cs = ClientConsumer::initial(known_points, NoopStorage { });
|
||||
let cs = ClientConsumer::initial(known_points, NoopObserver { });
|
||||
let cs = run_agent(cs, cs_channel).unwrap();
|
||||
println!("{:?}", cs);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ use net2::TcpStreamExt;
|
|||
use pallas_machines::primitives::Point;
|
||||
use std::net::TcpStream;
|
||||
|
||||
use pallas_chainsync::{ClientConsumer, NoopStorage};
|
||||
use pallas_chainsync::{ClientConsumer, NoopObserver};
|
||||
use pallas_handshake::n2n::{Client, VersionTable};
|
||||
use pallas_handshake::MAINNET_MAGIC;
|
||||
use pallas_machines::run_agent;
|
||||
|
|
@ -29,7 +29,7 @@ fn main() {
|
|||
|
||||
let cs_channel = muxer.use_channel(2);
|
||||
|
||||
let cs = ClientConsumer::initial(known_points, NoopStorage {});
|
||||
let cs = ClientConsumer::initial(known_points, NoopObserver {});
|
||||
let cs = run_agent(cs, cs_channel).unwrap();
|
||||
|
||||
println!("{:?}", cs);
|
||||
|
|
|
|||
|
|
@ -189,52 +189,58 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// An abstraction of a component in charge of block persistence
|
||||
pub trait Storage<C> {
|
||||
fn save_block(&self, content: &C) -> Result<(), Box<dyn std::error::Error>>;
|
||||
/// An observer of chain-sync events sent by the state-machine
|
||||
pub trait Observer<C> {
|
||||
fn on_block(&self, content: &C) -> Result<(), Box<dyn std::error::Error>>;
|
||||
fn on_rollback(&self, point: &Point) -> Result<(), Box<dyn std::error::Error>>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NoopStorage {}
|
||||
pub struct NoopObserver {}
|
||||
|
||||
impl<C> Storage<C> for NoopStorage
|
||||
impl<C> Observer<C> for NoopObserver
|
||||
where
|
||||
C: Debug,
|
||||
{
|
||||
fn save_block(&self, content: &C) -> Result<(), Box<dyn std::error::Error>> {
|
||||
fn on_block(&self, content: &C) -> Result<(), Box<dyn std::error::Error>> {
|
||||
log::warn!("asked to save block {:?}", content);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_rollback(&self, point: &Point) -> Result<(), Box<dyn std::error::Error>> {
|
||||
log::warn!("asked to roll back {:?}", point);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Consumer<C, S>
|
||||
pub struct Consumer<C, O>
|
||||
where
|
||||
S: Storage<C>,
|
||||
O: Observer<C>,
|
||||
{
|
||||
pub state: State,
|
||||
pub known_points: Vec<Point>,
|
||||
pub cursor: Option<Point>,
|
||||
pub tip: Option<Tip>,
|
||||
|
||||
storage: S,
|
||||
observer: O,
|
||||
|
||||
// as recommended here: https://doc.rust-lang.org/error-index.html#E0207
|
||||
_phantom: Option<C>,
|
||||
}
|
||||
|
||||
impl<C, S> Consumer<C, S>
|
||||
impl<C, O> Consumer<C, O>
|
||||
where
|
||||
C: EncodePayload + DecodePayload + Debug,
|
||||
S: Storage<C>,
|
||||
O: Observer<C>,
|
||||
{
|
||||
pub fn initial(known_points: Vec<Point>, storage: S) -> Self {
|
||||
pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
|
||||
Self {
|
||||
state: State::Idle,
|
||||
cursor: None,
|
||||
tip: None,
|
||||
known_points,
|
||||
storage,
|
||||
observer,
|
||||
|
||||
_phantom: None,
|
||||
}
|
||||
|
|
@ -291,8 +297,8 @@ where
|
|||
trace!("content: {:?}", content);
|
||||
}
|
||||
|
||||
debug!("saving block");
|
||||
self.storage.save_block(&content)?;
|
||||
debug!("reporint block to observer");
|
||||
self.observer.on_block(&content)?;
|
||||
|
||||
Ok(Self {
|
||||
tip: Some(tip),
|
||||
|
|
@ -304,6 +310,9 @@ where
|
|||
fn on_roll_backward(self, point: Point, tip: Tip) -> Transition<Self> {
|
||||
debug!("rolling backward to point: {:?}", point);
|
||||
|
||||
debug!("reporting rollback to observer");
|
||||
self.observer.on_rollback(&point)?;
|
||||
|
||||
Ok(Self {
|
||||
tip: Some(tip),
|
||||
cursor: Some(point),
|
||||
|
|
@ -322,10 +331,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<C, S> Agent for Consumer<C, S>
|
||||
impl<C, O> Agent for Consumer<C, O>
|
||||
where
|
||||
C: EncodePayload + DecodePayload + Debug + 'static,
|
||||
S: Storage<C>,
|
||||
O: Observer<C>,
|
||||
{
|
||||
type Message = Message<C>;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue