From b2335692ee0380d9f2b0588a4db1a788d4147154 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 21 Nov 2021 12:05:33 -0300 Subject: [PATCH] Add first-pass chain-sync implementation --- Cargo.toml | 1 + pallas-chainsync/.gitignore | 2 + pallas-chainsync/Cargo.toml | 24 ++ pallas-chainsync/examples/blocks.rs | 37 ++++ pallas-chainsync/examples/headers.rs | 37 ++++ pallas-chainsync/src/lib.rs | 317 +++++++++++++++++++++++++++ 6 files changed, 418 insertions(+) create mode 100644 pallas-chainsync/.gitignore create mode 100644 pallas-chainsync/Cargo.toml create mode 100644 pallas-chainsync/examples/blocks.rs create mode 100644 pallas-chainsync/examples/headers.rs create mode 100644 pallas-chainsync/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 726fe03..7d55aeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "pallas-machines", "pallas-handshake", "pallas-blockfetch", + "pallas-chainsync", "pallas", ] \ No newline at end of file diff --git a/pallas-chainsync/.gitignore b/pallas-chainsync/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas-chainsync/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas-chainsync/Cargo.toml b/pallas-chainsync/Cargo.toml new file mode 100644 index 0000000..33d9f37 --- /dev/null +++ b/pallas-chainsync/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "pallas-chainsync" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas-multiplexer = { path = "../pallas-multiplexer/" } +pallas-machines = { path = "../pallas-machines/" } +minicbor = { version="0.11.4", features=["half"] } +minicbor-io = "0.6.0" +log = "0.4.14" +hex = "0.4.3" + +[dev-dependencies] +net2 = "0.2.37" +env_logger = "0.9.0" +pallas-handshake = { path = "../pallas-handshake/" } diff --git a/pallas-chainsync/examples/blocks.rs b/pallas-chainsync/examples/blocks.rs new file mode 100644 index 0000000..d11e257 --- /dev/null +++ b/pallas-chainsync/examples/blocks.rs @@ -0,0 +1,37 @@ +use net2::TcpStreamExt; +use std::net::TcpStream; + +use pallas_chainsync::{Consumer, Point}; +use pallas_handshake::n2c::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; +use pallas_machines::run_agent; +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + //let bearer = TcpStream::connect("localhost:6000").unwrap(); + let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + let mut handles = Multiplexer::new(bearer, &vec![0, 5]).unwrap(); + let (_, rx, tx) = handles.remove(0); + + let versions = VersionTable::v1_and_above(MAINNET_MAGIC); + let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + println!("{:?}", last); + + let known_points = vec![Point( + 43847831u64, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), + )]; + + let (_, cs_rx, cs_tx) = handles.remove(0); + + let cs = Consumer::initial(known_points); + let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); + + println!("{:?}", cs); +} diff --git a/pallas-chainsync/examples/headers.rs b/pallas-chainsync/examples/headers.rs new file mode 100644 index 0000000..9800162 --- /dev/null +++ b/pallas-chainsync/examples/headers.rs @@ -0,0 +1,37 @@ +use net2::TcpStreamExt; +use std::net::TcpStream; + +use pallas_chainsync::{Consumer, Point}; +use pallas_handshake::n2n::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; +use pallas_machines::run_agent; +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + //let bearer = TcpStream::connect("localhost:6000").unwrap(); + let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + let mut handles = Multiplexer::new(bearer, &vec![0, 2]).unwrap(); + let (_, rx, tx) = handles.remove(0); + + let versions = VersionTable::v4_and_above(MAINNET_MAGIC); + let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + println!("{:?}", last); + + let known_points = vec![Point( + 43847831u64, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), + )]; + + let (_, cs_rx, cs_tx) = handles.remove(0); + + let cs = Consumer::initial(known_points); + let cs = run_agent(cs, cs_rx, &cs_tx).unwrap(); + + println!("{:?}", cs); +} diff --git a/pallas-chainsync/src/lib.rs b/pallas-chainsync/src/lib.rs new file mode 100644 index 0000000..55bfbdd --- /dev/null +++ b/pallas-chainsync/src/lib.rs @@ -0,0 +1,317 @@ +use std::fmt::Debug; + +use log::{debug, log_enabled, trace, warn}; +use minicbor::encode; +use pallas_machines::{ + Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder, + PayloadEncoder, Transition, +}; + +#[derive(Clone)] +pub struct Point(pub u64, pub Vec); + +impl Debug for Point { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("Point") + .field(&self.0) + .field(&hex::encode(&self.1)) + .finish() + } +} + +impl EncodePayload for Point { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?.u64(self.0)?.bytes(&self.1)?; + Ok(()) + } +} + +impl DecodePayload for Point { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let slot = d.u64()?; + let hash = d.bytes()?; + + Ok(Point(slot, Vec::from(hash))) + } +} + +pub type WrappedHeader = Vec; + +#[derive(Debug)] +pub struct Tip(Point, u64); + +impl EncodePayload for Tip { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + e.array(2)?; + self.0.encode_payload(e)?; + e.u64(self.1)?; + + Ok(()) + } +} + +impl DecodePayload for Tip { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let point = Point::decode_payload(d)?; + let block_num = d.u64()?; + + Ok(Tip(point, block_num)) + } +} + +#[derive(Debug, PartialEq, Clone)] +pub enum State { + Idle, + CanAwait, + MustReply, + Intersect, + Done, +} + +#[derive(Debug)] +pub enum Message { + RequestNext, + AwaitReply, + RollForward(WrappedHeader, Tip), + RollBackward(Point, Tip), + FindIntersect(Vec), + IntersectFound(Point, Tip), + IntersectNotFound(Tip), + Done, +} + +impl EncodePayload for Message { + fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { + match self { + Message::RequestNext => { + e.array(1)?.u16(0)?; + Ok(()) + } + Message::AwaitReply => { + e.array(1)?.u16(1)?; + Ok(()) + } + Message::RollForward(header, tip) => { + e.array(3)?.u16(2)?; + e.bytes(&header)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::RollBackward(point, tip) => { + e.array(3)?.u16(3)?; + point.encode_payload(e)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::FindIntersect(points) => { + e.array(2)?.u16(4)?; + e.array(points.len() as u64)?; + for point in points.iter() { + point.encode_payload(e)?; + } + Ok(()) + } + Message::IntersectFound(point, tip) => { + e.array(3)?.u16(5)?; + point.encode_payload(e)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::IntersectNotFound(tip) => { + e.array(1)?.u16(6)?; + tip.encode_payload(e)?; + Ok(()) + } + Message::Done => { + e.array(1)?.u16(7)?; + Ok(()) + } + } + } +} + +impl DecodePayload for Message { + fn decode_payload(d: &mut PayloadDecoder) -> Result> { + d.array()?; + let label = d.u16()?; + + match label { + 0 => Ok(Message::RequestNext), + 1 => Ok(Message::AwaitReply), + 2 => { + warn!("{:?}", d.array()?); + warn!("{:?}", d.u8()?); + warn!("{:?}", d.tag()?); + let header = Vec::from(d.bytes()?); + let tip = Tip::decode_payload(d)?; + Ok(Message::RollForward(header, tip)) + } + 3 => { + let point = Point::decode_payload(d)?; + let tip = Tip::decode_payload(d)?; + Ok(Message::RollBackward(point, tip)) + } + 4 => { + let points_len = d + .array()? + .ok_or(MachineError::UnexpectedCbor("unbounded points array"))?; + let mut points = Vec::with_capacity(points_len as usize); + for i in 0..(points_len - 1) { + points[i as usize] = Point::decode_payload(d)?; + } + Ok(Message::FindIntersect(points)) + } + 5 => { + let point = Point::decode_payload(d)?; + let tip = Tip::decode_payload(d)?; + Ok(Message::IntersectFound(point, tip)) + } + 6 => { + let tip = Tip::decode_payload(d)?; + Ok(Message::IntersectNotFound(tip)) + } + 7 => Ok(Message::Done), + x => Err(Box::new(MachineError::BadLabel(x))), + } + } +} + +#[derive(Debug)] +pub struct Consumer { + pub state: State, + pub known_points: Vec, + pub cursor: Option, + pub tip: Option, +} + +impl Consumer { + pub fn initial(known_points: Vec) -> Self { + Self { + state: State::Idle, + cursor: None, + tip: None, + known_points, + } + } + + fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::FindIntersect(self.known_points.clone()); + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Intersect, + ..self + }) + } + + fn send_request_next(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::RequestNext; + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::CanAwait, + ..self + }) + } + + fn on_intersect_found(self, point: Point, tip: Tip) -> Transition { + debug!("intersect found: {:?} (tip: {:?})", point, tip); + + Ok(Self { + tip: Some(tip), + cursor: Some(point), + state: State::Idle, + ..self + }) + } + + fn on_intersect_not_found(self, tip: Tip) -> Transition { + debug!("intersect not found (tip: {:?})", tip); + + Ok(Self { + tip: Some(tip), + cursor: None, + state: State::Idle, + ..self + }) + } + + fn on_roll_forward(self, header: Vec, tip: Tip) -> Transition { + debug!("rolling forward: {:?}", header); + + if log_enabled!(log::Level::Trace) { + trace!("header: {}", hex::encode(&header)); + } + + Ok(Self { + tip: Some(tip), + state: State::Idle, + ..self + }) + } + + fn on_roll_backward(self, point: Point, tip: Tip) -> Transition { + debug!("rolling backward to point: {:?}", point); + + Ok(Self { + tip: Some(tip), + cursor: Some(point), + state: State::Idle, + ..self + }) + } +} + +impl Agent for Consumer { + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Idle => true, + State::CanAwait => false, + State::MustReply => false, + State::Intersect => false, + State::Done => false, + } + } + + fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.state { + State::Idle => match self.cursor { + Some(_) => self.send_request_next(tx), + None => self.send_find_intersect(tx), + }, + _ => panic!("I don't have agency, don't know what to do"), + } + } + + fn receive_next(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::CanAwait, Message::RollForward(header, tip)) => { + self.on_roll_forward(header, tip) + } + (State::CanAwait, Message::RollBackward(point, tip)) => { + self.on_roll_backward(point, tip) + } + (State::MustReply, Message::RollForward(header, tip)) => { + self.on_roll_forward(header, tip) + } + (State::MustReply, Message::RollBackward(point, tip)) => { + self.on_roll_backward(point, tip) + } + (State::Intersect, Message::IntersectFound(point, tip)) => { + self.on_intersect_found(point, tip) + } + (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip), + _ => Err(Box::new(MachineError::InvalidMsgForState)), + } + } +}