diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs index c2e0d60..84247ce 100644 --- a/pallas-chainsync/examples/blocks.rs +++ b/pallas-chainsync/examples/blocks.rs @@ -1,4 +1,4 @@ -use pallas_chainsync::{Consumer, Point}; +use pallas_chainsync::{ClientConsumer, Point}; use pallas_handshake::n2c::{Client, VersionTable}; use pallas_handshake::MAINNET_MAGIC; use pallas_machines::run_agent; @@ -8,6 +8,8 @@ use std::os::unix::net::UnixStream; fn main() { env_logger::init(); + // we connect to the unix socket of the local node. Make sure you have the right + // path for your environment let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); let mut muxer = Multiplexer::try_setup(bearer, &vec![0, 4, 5]).unwrap(); @@ -24,7 +26,7 @@ fn main() { )]; let (cs_rx, cs_tx) = muxer.use_channel(5); - let cs = Consumer::initial(known_points); + let cs = ClientConsumer::initial(known_points); let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); println!("{:?}", cs); } diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs index 8e3233e..5f9502e 100644 --- a/pallas-chainsync/examples/headers.rs +++ b/pallas-chainsync/examples/headers.rs @@ -1,7 +1,7 @@ use net2::TcpStreamExt; use std::net::TcpStream; -use pallas_chainsync::{Consumer, Point}; +use pallas_chainsync::{ClientConsumer, Point}; use pallas_handshake::n2n::{Client, VersionTable}; use pallas_handshake::MAINNET_MAGIC; use pallas_machines::run_agent; @@ -29,7 +29,7 @@ fn main() { let (cs_rx, cs_tx) = muxer.use_channel(2); - let cs = Consumer::initial(known_points); + let cs = ClientConsumer::initial(known_points); let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); println!("{:?}", cs); diff --git a/pallas-chainsync/src/lib.rs b/pallas-chainsync/src/lib.rs index d2143f9..6db809e 100644 --- a/pallas-chainsync/src/lib.rs +++ b/pallas-chainsync/src/lib.rs @@ -1,7 +1,8 @@ use std::fmt::Debug; -use log::{debug, log_enabled, trace, warn}; +use log::{debug, log_enabled, trace}; +use minicbor::data::Tag; use pallas_machines::{ Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, PayloadEncoder, Transition, @@ -36,7 +37,48 @@ impl DecodePayload for Point { } } -pub type WrappedHeader = Vec; +#[derive(Debug)] +pub struct WrappedHeader(u64, Vec); + +impl EncodePayload for WrappedHeader { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?; + e.u64(self.0)?; + e.tag(Tag::Cbor)?; + e.bytes(&self.1)?; + + Ok(()) + } +} + +impl DecodePayload for WrappedHeader { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let unknown = d.u64()?; // WTF is this value? + d.tag()?; + let bytes = Vec::from(d.bytes()?); + + Ok(WrappedHeader(unknown, bytes)) + } +} + +#[derive(Debug)] +pub struct BlockBody(Vec); + +impl EncodePayload for BlockBody { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + todo!() + } +} + +impl DecodePayload for BlockBody { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.tag()?; + let bytes = Vec::from(d.bytes()?); + + Ok(BlockBody(bytes)) + } +} #[derive(Debug)] pub struct Tip(Point, u64); @@ -70,11 +112,15 @@ pub enum State { Done, } +/// A generic chain-sync message for either header or block content #[derive(Debug)] -pub enum Message { +pub enum Message +where + C: EncodePayload + DecodePayload, +{ RequestNext, AwaitReply, - RollForward(WrappedHeader, Tip), + RollForward(C, Tip), RollBackward(Point, Tip), FindIntersect(Vec), IntersectFound(Point, Tip), @@ -82,7 +128,10 @@ pub enum Message { Done, } -impl EncodePayload for Message { +impl EncodePayload for Message +where + C: EncodePayload + DecodePayload, +{ fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { match self { Message::RequestNext => { @@ -95,7 +144,7 @@ impl EncodePayload for Message { } Message::RollForward(header, tip) => { e.array(3)?.u16(2)?; - e.bytes(&header)?; + header.encode_payload(e)?; tip.encode_payload(e)?; Ok(()) } @@ -132,7 +181,10 @@ impl EncodePayload for Message { } } -impl DecodePayload for Message { +impl DecodePayload for Message +where + C: EncodePayload + DecodePayload, +{ fn decode_payload(d: &mut PayloadDecoder) -> Result> { d.array()?; let label = d.u16()?; @@ -141,12 +193,9 @@ impl DecodePayload for Message { 0 => Ok(Message::RequestNext), 1 => Ok(Message::AwaitReply), 2 => { - warn!("{:?}", d.array()?); - warn!("{:?}", d.u8()?); - warn!("{:?}", d.tag()?); - let header = Vec::from(d.bytes()?); + let content = C::decode_payload(d)?; let tip = Tip::decode_payload(d)?; - Ok(Message::RollForward(header, tip)) + Ok(Message::RollForward(content, tip)) } 3 => { let point = Point::decode_payload(d)?; @@ -173,25 +222,32 @@ impl DecodePayload for Message { } #[derive(Debug)] -pub struct Consumer { +pub struct Consumer { pub state: State, pub known_points: Vec, pub cursor: Option, pub tip: Option, + + // as recommended here: https://doc.rust-lang.org/error-index.html#E0207 + _phantom: Option, } -impl Consumer { +impl Consumer +where + C: EncodePayload + DecodePayload + Debug, +{ pub fn initial(known_points: Vec) -> Self { Self { state: State::Idle, cursor: None, tip: None, known_points, + _phantom: None, } } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::FindIntersect(self.known_points.clone()); + let msg = Message::::FindIntersect(self.known_points.clone()); tx.send_msg(&msg)?; @@ -202,7 +258,7 @@ impl Consumer { } fn send_request_next(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::RequestNext; + let msg = Message::::RequestNext; tx.send_msg(&msg)?; @@ -234,11 +290,11 @@ impl Consumer { }) } - fn on_roll_forward(self, header: Vec, tip: Tip) -> Transition { - debug!("rolling forward: {:?}", header); + fn on_roll_forward(self, content: C, tip: Tip) -> Transition { + debug!("rolling forward"); if log_enabled!(log::Level::Trace) { - trace!("header: {}", hex::encode(&header)); + trace!("header: {:?}", content); } Ok(Self { @@ -260,8 +316,11 @@ impl Consumer { } } -impl Agent for Consumer { - type Message = Message; +impl Agent for Consumer +where + C: EncodePayload + DecodePayload + Debug, +{ + type Message = Message; fn is_done(&self) -> bool { self.state == State::Done @@ -309,3 +368,7 @@ impl Agent for Consumer { } } } + +pub type NodeConsumer = Consumer; + +pub type ClientConsumer = Consumer;