chore: Simplify ChainSync agent logic (#48)

This commit is contained in:
Santiago Carmuega 2022-02-14 18:30:16 -03:00 committed by GitHub
parent c3662e199d
commit 630d53debc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 145 additions and 109 deletions

View file

@ -1,7 +1,7 @@
use pallas_primitives::alonzo::{crypto, Block, BlockWrapper};
use pallas_primitives::alonzo::{Block, BlockWrapper};
use pallas_primitives::Fragment;
use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_miniprotocols::chainsync::{Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_miniprotocols::{DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};
@ -25,14 +25,6 @@ impl DecodePayload for Content {
Ok(Content(block))
}
}
impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.0.header);
Ok(Point(self.0.header.header_body.slot, hash.to_vec()))
}
}
fn main() {
env_logger::init();

View file

@ -1,12 +1,12 @@
use minicbor::data::Tag;
use net2::TcpStreamExt;
use pallas_primitives::alonzo::{crypto, Header};
use pallas_primitives::alonzo::Header;
use pallas_primitives::Fragment;
use pallas_miniprotocols::Point;
use std::net::TcpStream;
use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_miniprotocols::chainsync::{Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2n::{Client, VersionTable};
use pallas_miniprotocols::{
run_agent, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, MAINNET_MAGIC,
@ -38,13 +38,6 @@ impl DecodePayload for Content {
}
}
impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.1);
Ok(Point(self.1.header_body.slot, hash.to_vec()))
}
}
fn main() {
env_logger::init();

View file

@ -1,37 +1,20 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use log::{debug, log_enabled, trace};
use log::debug;
use crate::machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
};
use crate::machines::{Agent, MachineError, MachineOutput, Transition};
use crate::{DecodePayload, EncodePayload};
use crate::common::Point;
use crate::payloads::{PayloadDecoder, PayloadEncoder};
use super::{Message, State, Tip};
/// A trait to deal with polymorphic payloads in the ChainSync protocol
/// (WrappedHeader vs BlockBody)
pub trait BlockLike: EncodePayload + DecodePayload + Debug {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>>;
}
use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip};
/// An observer of chain-sync events sent by the state-machine
pub trait Observer<C>
where
C: Debug,
{
fn on_block(
&self,
cursor: &Option<Point>,
content: &C,
) -> Result<(), Box<dyn std::error::Error>> {
log::debug!(
"asked to save block content {:?} at cursor {:?}",
content,
cursor
);
pub trait Observer<C> {
fn on_roll_forward(&self, _content: C, tip: &Tip) -> Result<(), Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);
Ok(())
}
@ -57,43 +40,42 @@ where
#[derive(Debug)]
pub struct NoopObserver {}
impl<C> Observer<C> for NoopObserver where C: Debug {}
impl<C> Observer<C> for NoopObserver {}
#[derive(Debug)]
pub struct Consumer<C, O>
where
O: Observer<C>,
C: Debug,
{
pub state: State,
pub known_points: Vec<Point>,
pub cursor: Option<Point>,
pub intersect: Option<Point>,
pub tip: Option<Tip>,
observer: O,
// as recommended here: https://doc.rust-lang.org/error-index.html#E0207
_phantom: Option<C>,
_phantom: PhantomData<C>,
}
impl<C, O> Consumer<C, O>
where
C: BlockLike + EncodePayload + DecodePayload + Debug,
O: Observer<C>,
C: DecodePayload + EncodePayload,
{
pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
Self {
state: State::Idle,
cursor: None,
intersect: None,
tip: None,
known_points,
observer,
_phantom: None,
_phantom: PhantomData::default(),
}
}
fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("requesting find intersect");
let msg = Message::<C>::FindIntersect(self.known_points.clone());
tx.send_msg(&msg)?;
@ -105,6 +87,8 @@ where
}
fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("requesting next");
let msg = Message::<C>::RequestNext;
tx.send_msg(&msg)?;
@ -122,7 +106,7 @@ where
Ok(Self {
tip: Some(tip),
cursor: Some(point),
intersect: Some(point),
state: State::Idle,
..self
})
@ -133,7 +117,7 @@ where
Ok(Self {
tip: Some(tip),
cursor: None,
intersect: None,
state: State::Done,
..self
})
@ -142,17 +126,9 @@ where
fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
debug!("rolling forward");
let point = content.block_point()?;
if log_enabled!(log::Level::Trace) {
trace!("content: {:?}", content);
}
debug!("reporting block to observer");
self.observer.on_block(&self.cursor, &content)?;
self.observer.on_roll_forward(content, &tip)?;
Ok(Self {
cursor: Some(point),
tip: Some(tip),
state: State::Idle,
..self
@ -167,7 +143,7 @@ where
Ok(Self {
tip: Some(tip),
cursor: Some(point),
intersect: Some(point),
state: State::Idle,
..self
})
@ -176,7 +152,6 @@ where
fn on_await_reply(self) -> Transition<Self> {
debug!("reached tip, await reply");
debug!("reporting tip to observer");
self.observer.on_tip_reached()?;
Ok(Self {
@ -188,7 +163,7 @@ where
impl<C, O> Agent for Consumer<C, O>
where
C: BlockLike + EncodePayload + DecodePayload + Debug + 'static,
C: EncodePayload + DecodePayload + Debug + 'static,
O: Observer<C>,
{
type Message = Message<C>;
@ -209,7 +184,7 @@ where
fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::Idle => match self.cursor {
State::Idle => match self.intersect {
Some(_) => self.send_request_next(tx),
None => self.send_find_intersect(tx),
},
@ -226,8 +201,8 @@ where
self.on_roll_backward(point, tip)
}
(State::CanAwait, Message::AwaitReply) => self.on_await_reply(),
(State::MustReply, Message::RollForward(header, tip)) => {
self.on_roll_forward(header, tip)
(State::MustReply, Message::RollForward(content, tip)) => {
self.on_roll_forward(content, tip)
}
(State::MustReply, Message::RollBackward(point, tip)) => {
self.on_roll_backward(point, tip)
@ -258,7 +233,7 @@ impl TipFinder {
}
fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<NoopContent>::FindIntersect(vec![self.wellknown_point.clone()]);
let msg = Message::<SkippedContent>::FindIntersect(vec![self.wellknown_point.clone()]);
tx.send_msg(&msg)?;
@ -289,29 +264,12 @@ impl TipFinder {
}
}
#[derive(Debug)]
pub struct NoopContent {}
pub type HeaderConsumer<O> = Consumer<HeaderContent, O>;
impl EncodePayload for NoopContent {
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for NoopContent {
fn decode_payload(_d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
todo!()
}
}
impl BlockLike for NoopContent {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
todo!()
}
}
pub type BlockConsumer<O> = Consumer<BlockContent, O>;
impl Agent for TipFinder {
type Message = Message<NoopContent>;
type Message = Message<SkippedContent>;
fn is_done(&self) -> bool {
self.state == State::Done

View file

@ -1,7 +1,7 @@
use crate::common::Point;
use crate::machines::{CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};
use super::{Message, Tip};
use super::{BlockContent, HeaderContent, Message, SkippedContent, Tip};
impl EncodePayload for Tip {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
@ -25,7 +25,7 @@ impl DecodePayload for Tip {
impl<C> EncodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
C: EncodePayload,
{
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
match self {
@ -37,9 +37,9 @@ where
e.array(1)?.u16(1)?;
Ok(())
}
Message::RollForward(header, tip) => {
Message::RollForward(content, tip) => {
e.array(3)?.u16(2)?;
header.encode_payload(e)?;
content.encode_payload(e)?;
tip.encode_payload(e)?;
Ok(())
}
@ -78,7 +78,7 @@ where
impl<C> DecodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
C: DecodePayload,
{
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
@ -115,3 +115,72 @@ where
}
}
}
impl DecodePayload for HeaderContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let variant = d.u32()?; // WTF is this value?
match variant {
// byron
0 => {
d.array()?;
// can't find a reference anywhere about the structure of these values, but they
// seem to provide the Byron-specific variant of the header
let (a, b): (u8, u64) = d.decode()?;
d.tag()?;
let bytes = d.bytes()?;
Ok(HeaderContent::Byron(a, b, Vec::from(bytes)))
}
// shelley
_ => {
d.tag()?;
let bytes = d.bytes()?;
Ok(HeaderContent::Shelley(Vec::from(bytes)))
}
}
}
}
impl EncodePayload for HeaderContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for BlockContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
Ok(BlockContent(d.decode()?))
}
}
impl EncodePayload for BlockContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for SkippedContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.skip()?;
Ok(SkippedContent)
}
}
impl EncodePayload for SkippedContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}

View file

@ -1,7 +1,6 @@
use std::fmt::Debug;
use std::{fmt::Debug, ops::Deref};
use crate::common::Point;
use crate::machines::{DecodePayload, EncodePayload};
#[derive(Debug)]
pub struct Tip(pub Point, pub u64);
@ -17,10 +16,7 @@ pub enum State {
/// A generic chain-sync message for either header or block content
#[derive(Debug)]
pub enum Message<C>
where
C: EncodePayload + DecodePayload + Sized,
{
pub enum Message<C> {
RequestNext,
AwaitReply,
RollForward(C, Tip),
@ -30,3 +26,23 @@ where
IntersectNotFound(Tip),
Done,
}
#[derive(Debug)]
pub enum HeaderContent {
Byron(u8, u64, Vec<u8>),
Shelley(Vec<u8>),
}
#[derive(Debug)]
pub struct BlockContent(pub Vec<u8>);
impl Deref for BlockContent {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub struct SkippedContent;

View file

@ -142,14 +142,22 @@ impl<'a> PayloadDeconstructor<'a> {
debug!("consumed {} from payload buffer", new_pos);
Ok(t)
}
Err(_err) => {
//TODO: we need to match EndOfInput kind of errors
Err(err) => {
let downcast = err.downcast::<minicbor::decode::Error>();
debug!("payload incomplete, fetching next segment");
let payload = self.rx.recv()?;
self.remaining.extend(payload);
match downcast {
Ok(err) => match err.deref() {
minicbor::decode::Error::EndOfInput => {
debug!("payload incomplete, fetching next segment");
let payload = self.rx.recv()?;
self.remaining.extend(payload);
self.consume_next_message::<T>()
self.consume_next_message::<T>()
}
_ => Err(err),
},
Err(err) => Err(err),
}
}
}
}