Allow polymorphic content in chain-sync

This commit is contained in:
Santiago Carmuega 2021-11-24 07:09:37 -03:00
parent 91372b99ac
commit 7f50bebafd
3 changed files with 90 additions and 25 deletions

View file

@ -1,4 +1,4 @@
use pallas_chainsync::{Consumer, Point};
use pallas_chainsync::{ClientConsumer, Point};
use pallas_handshake::n2c::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
@ -8,6 +8,8 @@ use std::os::unix::net::UnixStream;
fn main() {
env_logger::init();
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();
let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4, 5]).unwrap();
@ -24,7 +26,7 @@ fn main() {
)];
let (cs_rx, cs_tx) = muxer.use_channel(5);
let cs = Consumer::initial(known_points);
let cs = ClientConsumer::initial(known_points);
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
println!("{:?}", cs);
}

View file

@ -1,7 +1,7 @@
use net2::TcpStreamExt;
use std::net::TcpStream;
use pallas_chainsync::{Consumer, Point};
use pallas_chainsync::{ClientConsumer, Point};
use pallas_handshake::n2n::{Client, VersionTable};
use pallas_handshake::MAINNET_MAGIC;
use pallas_machines::run_agent;
@ -29,7 +29,7 @@ fn main() {
let (cs_rx, cs_tx) = muxer.use_channel(2);
let cs = Consumer::initial(known_points);
let cs = ClientConsumer::initial(known_points);
let cs = run_agent(cs, cs_rx, &cs_tx).unwrap();
println!("{:?}", cs);

View file

@ -1,7 +1,8 @@
use std::fmt::Debug;
use log::{debug, log_enabled, trace, warn};
use log::{debug, log_enabled, trace};
use minicbor::data::Tag;
use pallas_machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
PayloadEncoder, Transition,
@ -36,7 +37,48 @@ impl DecodePayload for Point {
}
}
pub type WrappedHeader = Vec<u8>;
#[derive(Debug)]
pub struct WrappedHeader(u64, Vec<u8>);
impl EncodePayload for WrappedHeader {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
e.array(2)?;
e.u64(self.0)?;
e.tag(Tag::Cbor)?;
e.bytes(&self.1)?;
Ok(())
}
}
impl DecodePayload for WrappedHeader {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let unknown = d.u64()?; // WTF is this value?
d.tag()?;
let bytes = Vec::from(d.bytes()?);
Ok(WrappedHeader(unknown, bytes))
}
}
#[derive(Debug)]
pub struct BlockBody(Vec<u8>);
impl EncodePayload for BlockBody {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for BlockBody {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
let bytes = Vec::from(d.bytes()?);
Ok(BlockBody(bytes))
}
}
#[derive(Debug)]
pub struct Tip(Point, u64);
@ -70,11 +112,15 @@ pub enum State {
Done,
}
/// A generic chain-sync message for either header or block content
#[derive(Debug)]
pub enum Message {
pub enum Message<C>
where
C: EncodePayload + DecodePayload,
{
RequestNext,
AwaitReply,
RollForward(WrappedHeader, Tip),
RollForward(C, Tip),
RollBackward(Point, Tip),
FindIntersect(Vec<Point>),
IntersectFound(Point, Tip),
@ -82,7 +128,10 @@ pub enum Message {
Done,
}
impl EncodePayload for Message {
impl<C> EncodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
{
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
match self {
Message::RequestNext => {
@ -95,7 +144,7 @@ impl EncodePayload for Message {
}
Message::RollForward(header, tip) => {
e.array(3)?.u16(2)?;
e.bytes(&header)?;
header.encode_payload(e)?;
tip.encode_payload(e)?;
Ok(())
}
@ -132,7 +181,10 @@ impl EncodePayload for Message {
}
}
impl DecodePayload for Message {
impl<C> DecodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
{
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let label = d.u16()?;
@ -141,12 +193,9 @@ impl DecodePayload for Message {
0 => Ok(Message::RequestNext),
1 => Ok(Message::AwaitReply),
2 => {
warn!("{:?}", d.array()?);
warn!("{:?}", d.u8()?);
warn!("{:?}", d.tag()?);
let header = Vec::from(d.bytes()?);
let content = C::decode_payload(d)?;
let tip = Tip::decode_payload(d)?;
Ok(Message::RollForward(header, tip))
Ok(Message::RollForward(content, tip))
}
3 => {
let point = Point::decode_payload(d)?;
@ -173,25 +222,32 @@ impl DecodePayload for Message {
}
#[derive(Debug)]
pub struct Consumer {
pub struct Consumer<C> {
pub state: State,
pub known_points: Vec<Point>,
pub cursor: Option<Point>,
pub tip: Option<Tip>,
// as recommended here: https://doc.rust-lang.org/error-index.html#E0207
_phantom: Option<C>,
}
impl Consumer {
impl<C> Consumer<C>
where
C: EncodePayload + DecodePayload + Debug,
{
pub fn initial(known_points: Vec<Point>) -> Self {
Self {
state: State::Idle,
cursor: None,
tip: None,
known_points,
_phantom: None,
}
}
fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::FindIntersect(self.known_points.clone());
let msg = Message::<C>::FindIntersect(self.known_points.clone());
tx.send_msg(&msg)?;
@ -202,7 +258,7 @@ impl Consumer {
}
fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::RequestNext;
let msg = Message::<C>::RequestNext;
tx.send_msg(&msg)?;
@ -234,11 +290,11 @@ impl Consumer {
})
}
fn on_roll_forward(self, header: Vec<u8>, tip: Tip) -> Transition<Self> {
debug!("rolling forward: {:?}", header);
fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
debug!("rolling forward");
if log_enabled!(log::Level::Trace) {
trace!("header: {}", hex::encode(&header));
trace!("header: {:?}", content);
}
Ok(Self {
@ -260,8 +316,11 @@ impl Consumer {
}
}
impl Agent for Consumer {
type Message = Message;
impl<C> Agent for Consumer<C>
where
C: EncodePayload + DecodePayload + Debug,
{
type Message = Message<C>;
fn is_done(&self) -> bool {
self.state == State::Done
@ -309,3 +368,7 @@ impl Agent for Consumer {
}
}
}
pub type NodeConsumer = Consumer<WrappedHeader>;
pub type ClientConsumer = Consumer<BlockBody>;