diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..869df07 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..726fe03 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[workspace] + +members = [ + "pallas-multiplexer", + "pallas-machines", + "pallas-handshake", + "pallas-blockfetch", + "pallas", +] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4bc3496 --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +# Pallas + +Rust-native building blocks for the Cardano blockchain ecosystem. + +## Introduction + +Pallas is an expanding collection of modules that re-implements common +Cardano logic in native Rust. This crate doesn't provide any particular +application, it is meant to be used as a base layer to facilitate the +development of higher-level use-cases, such as explorers, wallets, etc (who +knows, maybe even a full node in the far away future). + +## Etymology + +> Pallas: (Greek mythology) goddess of wisdom and useful arts and prudent warfare; diff --git a/pallas-blockfetch/.gitignore b/pallas-blockfetch/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas-blockfetch/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas-blockfetch/Cargo.toml b/pallas-blockfetch/Cargo.toml new file mode 100644 index 0000000..850d3f8 --- /dev/null +++ b/pallas-blockfetch/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "pallas-blockfetch" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas-multiplexer = { path = "../pallas-multiplexer/" } +pallas-machines = { path = "../pallas-machines/" } +minicbor = { version="0.11.4", features=["half"] } +minicbor-io = "0.6.0" +log = "0.4.14" + +[dev-dependencies] +net2 = "0.2.37" +env_logger = "0.9.0" +pallas-handshake = { path = "../pallas-handshake/" } +hex = "0.4.3" diff --git a/pallas-blockfetch/examples/client.rs b/pallas-blockfetch/examples/client.rs new file mode 100644 index 0000000..ebd9a5d --- /dev/null +++ b/pallas-blockfetch/examples/client.rs @@ -0,0 +1,47 @@ +use net2::TcpStreamExt; +use std::net::TcpStream; + +use pallas_blockfetch::{BlockFetchClient, Point}; +use pallas_handshake::n2n::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; +use pallas_machines::run_agent; +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + //let bearer = TcpStream::connect("localhost:6000").unwrap(); + let bearer = + TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + let mut handles = Multiplexer::new(bearer, &vec![0, 3]).unwrap(); + let (_, rx, tx) = handles.remove(0); + + let versions = VersionTable::v4_and_above(MAINNET_MAGIC); + let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + println!("{:?}", last); + + let range = ( + Point( + 43847831u64, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45") + .unwrap(), + ), + Point( + 43847831u64, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45") + .unwrap(), + ), + ); + + let (_, bf_rx, bf_tx) = handles.remove(0); + + let bf = BlockFetchClient::initial(range); + + let bf_last = run_agent(bf, bf_rx, &bf_tx); + + println!("{:?}", bf_last); +} diff --git a/pallas-blockfetch/src/lib.rs b/pallas-blockfetch/src/lib.rs new file mode 100644 index 0000000..5ba865b --- /dev/null +++ b/pallas-blockfetch/src/lib.rs @@ -0,0 +1,190 @@ +use log::info; +use pallas_machines::{ + Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, + PayloadDecoder, PayloadEncoder, Transition, +}; + +#[derive(Clone, Debug)] +pub struct Point(pub u64, pub Vec); + +impl EncodePayload for Point { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + e.array(2)?.u64(self.0)?.bytes(&self.1)?; + Ok(()) + } +} + +impl DecodePayload for Point { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result> { + d.array()?; + let slot = d.u64()?; + let hash = d.bytes()?; + + Ok(Point(slot, Vec::from(hash))) + } +} + +#[derive(Debug, PartialEq, Clone)] +pub enum State { + Idle, + Busy, + Streaming, + Done, +} + +#[derive(Debug)] +pub enum Message { + RequestRange { range: (Point, Point) }, + ClientDone, + StartBatch, + NoBlocks, + Block { body: Vec }, + BatchDone, +} + +impl EncodePayload for Message { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + match self { + Message::RequestRange { range } => { + e.array(3)?.u16(0)?; + range.0.encode_payload(e)?; + range.1.encode_payload(e)?; + Ok(()) + } + Message::ClientDone => { + e.array(1)?.u16(1)?; + Ok(()) + } + Message::StartBatch => { + e.array(1)?.u16(2)?; + Ok(()) + } + Message::NoBlocks => { + e.array(1)?.u16(3)?; + Ok(()) + } + Message::Block { body } => { + e.array(2)?.u16(4)?; + e.bytes(&body)?; + Ok(()) + } + Message::BatchDone => { + e.array(1)?.u16(5)?; + Ok(()) + } + } + } +} + +impl DecodePayload for Message { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result> { + d.array()?; + let label = d.u16()?; + + match label { + 0 => { + let point1 = Point::decode_payload(d)?; + let point2 = Point::decode_payload(d)?; + Ok(Message::RequestRange { + range: (point1, point2), + }) + } + 1 => Ok(Message::ClientDone), + 2 => Ok(Message::StartBatch), + 3 => Ok(Message::NoBlocks), + 4 => { + d.tag()?; + let body = d.bytes()?; + Ok(Message::Block { + body: Vec::from(body), + }) + } + 5 => Ok(Message::BatchDone), + x => Err(Box::new(MachineError::BadLabel(x))), + } + } +} + +#[derive(Debug)] +pub struct BlockFetchClient { + pub state: State, + pub range: (Point, Point), +} + +impl BlockFetchClient { + pub fn initial(range: (Point, Point)) -> Self { + Self { + state: State::Idle, + range, + } + } + + fn send_request_range(self, tx: &impl MachineOutput) -> Transition { + let msg = Message::RequestRange { + range: self.range.clone(), + }; + + tx.send_msg(&msg)?; + + Ok(Self { + state: State::Busy, + ..self + }) + } +} + +impl Agent for BlockFetchClient { + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Idle => true, + State::Busy => false, + State::Streaming => false, + State::Done => false, + } + } + + fn send_next(self, tx: &impl MachineOutput) -> Transition { + match self.state { + State::Idle => self.send_request_range(tx), + _ => panic!("I don't have agency, don't know what to do"), + } + } + + fn receive_next(self, msg: Self::Message) -> Transition { + match (&self.state, msg) { + (State::Busy, Message::StartBatch) => Ok(Self { + state: State::Streaming, + ..self + }), + (State::Busy, Message::NoBlocks) => Ok(Self { + state: State::Done, + ..self + }), + (State::Streaming, Message::Block { body }) => { + info!("received block body of size {}", body.len()); + Ok(self) + } + (State::Streaming, Message::BatchDone) => Ok(Self { + state: State::Done, + ..self + }), + _ => panic!("I have agency, I don't expect messages"), + } + } +} diff --git a/pallas-handshake/.gitignore b/pallas-handshake/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas-handshake/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas-handshake/Cargo.toml b/pallas-handshake/Cargo.toml new file mode 100644 index 0000000..3b0984b --- /dev/null +++ b/pallas-handshake/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "pallas-handshake" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas-multiplexer = { path = "../pallas-multiplexer/" } +pallas-machines = { path = "../pallas-machines/" } +minicbor = { version="0.11.4", features=["half"] } +minicbor-io = "0.6.0" +itertools = "0.10.1" +log = "0.4.14" + +[dev-dependencies] +net2 = "0.2.37" +env_logger = "0.9.0" diff --git a/pallas-handshake/examples/client.rs b/pallas-handshake/examples/client.rs new file mode 100644 index 0000000..054d0be --- /dev/null +++ b/pallas-handshake/examples/client.rs @@ -0,0 +1,26 @@ +use net2::TcpStreamExt; +use std::net::TcpStream; + +use pallas_handshake::n2c::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; +use pallas_machines::run_agent; +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + //let bearer = TcpStream::connect("localhost:6000").unwrap(); + let bearer = + TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap(); + let (_, rx, tx) = handles.remove(0); + + let versions = VersionTable::v1_and_above(MAINNET_MAGIC); + let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + + println!("{:?}", last); +} diff --git a/pallas-handshake/examples/node.rs b/pallas-handshake/examples/node.rs new file mode 100644 index 0000000..88aade0 --- /dev/null +++ b/pallas-handshake/examples/node.rs @@ -0,0 +1,26 @@ +use net2::TcpStreamExt; +use std::net::TcpStream; + +use pallas_handshake::n2n::{Client, VersionTable}; +use pallas_handshake::MAINNET_MAGIC; +use pallas_machines::run_agent; +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + //let bearer = TcpStream::connect("localhost:6000").unwrap(); + let bearer = + TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + let mut handles = Multiplexer::new(bearer, &vec![0]).unwrap(); + let (_, rx, tx) = handles.remove(0); + + let versions = VersionTable::v4_and_above(MAINNET_MAGIC); + let last = run_agent(Client::initial(versions), rx, &tx).unwrap(); + + println!("{:?}", last); +} diff --git a/pallas-handshake/src/common.rs b/pallas-handshake/src/common.rs new file mode 100644 index 0000000..4e1bc73 --- /dev/null +++ b/pallas-handshake/src/common.rs @@ -0,0 +1,80 @@ +use itertools::Itertools; +use pallas_machines::{DecodePayload, EncodePayload, PayloadEncoder}; +use std::{collections::HashMap, fmt::Debug}; + +pub const TESTNET_MAGIC: u64 = 1097911063; +pub const MAINNET_MAGIC: u64 = 764824073; + +#[derive(Debug, Clone)] +pub struct VersionTable +where + T: Debug + Clone + EncodePayload + DecodePayload, +{ + pub values: HashMap, +} + +impl EncodePayload for VersionTable +where + T: Debug + Clone + EncodePayload + DecodePayload, +{ + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + e.map(self.values.len() as u64)?; + + for key in self.values.keys().sorted() { + e.u64(*key)?; + self.values[key].encode_payload(e)?; + } + + Ok(()) + } +} + +pub type NetworkMagic = u64; + +pub type VersionNumber = u64; + +#[derive(Debug)] +pub enum RefuseReason { + VersionMismatch(Vec), + HandshakeDecodeError(VersionNumber, String), + Refused(VersionNumber, String), +} + +impl EncodePayload for RefuseReason { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + match self { + RefuseReason::VersionMismatch(versions) => { + e.array(2)?; + e.u16(0)?; + e.array(versions.len() as u64)?; + for v in versions.iter() { + e.u64(*v)?; + } + + Ok(()) + } + RefuseReason::HandshakeDecodeError(version, msg) => { + e.array(3)?; + e.u16(1)?; + e.u64(*version)?; + e.str(msg)?; + + Ok(()) + } + RefuseReason::Refused(version, msg) => { + e.array(3)?; + e.u16(1)?; + e.u64(*version)?; + e.str(msg)?; + + Ok(()) + } + } + } +} diff --git a/pallas-handshake/src/lib.rs b/pallas-handshake/src/lib.rs new file mode 100644 index 0000000..ad00c1f --- /dev/null +++ b/pallas-handshake/src/lib.rs @@ -0,0 +1,6 @@ +mod common; + +pub mod n2c; +pub mod n2n; + +pub use common::{MAINNET_MAGIC, TESTNET_MAGIC}; diff --git a/pallas-handshake/src/n2c.rs b/pallas-handshake/src/n2c.rs new file mode 100644 index 0000000..6388474 --- /dev/null +++ b/pallas-handshake/src/n2c.rs @@ -0,0 +1,203 @@ +use core::panic; +use std::collections::HashMap; + +use pallas_machines::{ + Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, + PayloadDecoder, PayloadEncoder, +}; + +use crate::common::{NetworkMagic, RefuseReason, VersionNumber}; + +pub type VersionTable = crate::common::VersionTable; + +const PROTOCOL_V1: u64 = 1; +const PROTOCOL_V2: u64 = 32770; +const PROTOCOL_V3: u64 = 32771; +const PROTOCOL_V4: u64 = 32772; +const PROTOCOL_V5: u64 = 32773; +const PROTOCOL_V6: u64 = 32774; +const PROTOCOL_V7: u64 = 32775; +const PROTOCOL_V8: u64 = 32776; +const PROTOCOL_V9: u64 = 32777; +// const PROTOCOL_V10: u64 = 32778; + +impl VersionTable { + pub fn v1_and_above(network_magic: u64) -> VersionTable { + let values = vec![ + (PROTOCOL_V1, VersionData(network_magic)), + (PROTOCOL_V2, VersionData(network_magic)), + (PROTOCOL_V3, VersionData(network_magic)), + (PROTOCOL_V4, VersionData(network_magic)), + (PROTOCOL_V5, VersionData(network_magic)), + (PROTOCOL_V6, VersionData(network_magic)), + (PROTOCOL_V7, VersionData(network_magic)), + (PROTOCOL_V8, VersionData(network_magic)), + (PROTOCOL_V9, VersionData(network_magic)), + ] + .into_iter() + .collect::>(); + + VersionTable { values } + } +} + +#[derive(Debug, Clone)] +pub struct VersionData (NetworkMagic,); + +impl EncodePayload for VersionData { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + e.u64(self.0)?; + + Ok(()) + } +} + +impl DecodePayload for VersionData { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result> { + let network_magic = d.u64()?; + + Ok(Self(network_magic)) + } +} + +#[derive(Debug)] +pub enum Message { + Propose(VersionTable), + Accept(VersionNumber, VersionData), + Refuse(RefuseReason), +} + +impl EncodePayload for Message { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + match self { + Message::Propose(version_table) => { + e.array(2)?.u16(0)?; + version_table.encode_payload(e)?; + } + Message::Accept(version_number, version_data) => { + e.array(3)?.u16(1)?; + e.u64(*version_number)?; + version_data.encode_payload(e)?; + } + Message::Refuse(reason) => { + e.array(2)?.u16(2)?; + reason.encode_payload(e)?; + } + }; + + Ok(()) + } +} + +impl DecodePayload for Message { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result> { + d.array()?; + + let msg = match d.u16()? { + 0 => todo!(), + 1 => { + let version_number = d.u64()?; + let version_data = VersionData::decode_payload(d)?; + + Message::Accept(version_number, version_data) + } + 2 => todo!(), + x => return Err(Box::new(MachineError::BadLabel(x))), + }; + + Ok(msg) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Propose, + Confirm, + Done, +} + +#[derive(Debug)] +pub enum Output { + Pending, + Accepted(VersionNumber, VersionData), + Refused(RefuseReason), +} + +#[derive(Debug)] +pub struct Client { + state: State, + output: Output, + version_table: VersionTable, +} + +impl Client { + pub fn initial(version_table: VersionTable) -> Self { + Client { + state: State::Propose, + output: Output::Pending, + version_table, + } + } +} + +impl Agent for Client { + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Propose => true, + State::Confirm => false, + State::Done => false, + } + } + + fn send_next( + self, + tx: &impl MachineOutput, + ) -> Result> { + match self.state { + State::Propose => { + tx.send_msg(&Message::Propose(self.version_table.clone()))?; + + Ok(Self { + state: State::Confirm, + ..self + }) + } + _ => panic!("I don't have agency, nothing to send"), + } + } + + fn receive_next( + self, + msg: Self::Message, + ) -> Result> { + match (self.state, msg) { + (State::Confirm, Message::Accept(version, data)) => Ok(Self { + state: State::Done, + output: Output::Accepted(version, data), + ..self + }), + (State::Confirm, Message::Refuse(reason)) => Ok(Self { + state: State::Done, + output: Output::Refused(reason), + ..self + }), + _ => panic!("Current state does't expect to receive a message"), + } + } +} diff --git a/pallas-handshake/src/n2n.rs b/pallas-handshake/src/n2n.rs new file mode 100644 index 0000000..3c2e62a --- /dev/null +++ b/pallas-handshake/src/n2n.rs @@ -0,0 +1,214 @@ +use core::panic; +use std::collections::HashMap; + +use pallas_machines::{ + Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, + PayloadDecoder, PayloadEncoder, +}; + +use crate::common::{RefuseReason, VersionNumber}; + +pub type VersionTable = crate::common::VersionTable; + +const PROTOCOL_V4: u64 = 4; +const PROTOCOL_V5: u64 = 5; +const PROTOCOL_V6: u64 = 6; +const PROTOCOL_V7: u64 = 7; + +impl VersionTable { + pub fn v4_and_above(network_magic: u64) -> VersionTable { + let values = vec![ + (PROTOCOL_V4, VersionData::new(network_magic, false)), + (PROTOCOL_V5, VersionData::new(network_magic, false)), + (PROTOCOL_V6, VersionData::new(network_magic, false)), + (PROTOCOL_V7, VersionData::new(network_magic, false)), + ] + .into_iter() + .collect::>(); + + VersionTable { values } + } +} + +#[derive(Debug, Clone)] +pub struct VersionData { + network_magic: u64, + initiator_and_responder_diffusion_mode: bool, +} + +impl VersionData { + pub fn new( + network_magic: u64, + initiator_and_responder_diffusion_mode: bool, + ) -> Self { + VersionData { + network_magic, + initiator_and_responder_diffusion_mode, + } + } +} + +impl EncodePayload for VersionData { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + e.array(2)? + .u64(self.network_magic)? + .bool(self.initiator_and_responder_diffusion_mode)?; + + Ok(()) + } +} + +impl DecodePayload for VersionData { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result> { + d.array()?; + let network_magic = d.u64()?; + let initiator_and_responder_diffusion_mode = d.bool()?; + + Ok(Self { + network_magic, + initiator_and_responder_diffusion_mode, + }) + } +} + +#[derive(Debug)] +pub enum Message { + Propose(VersionTable), + Accept(VersionNumber, VersionData), + Refuse(RefuseReason), +} + +impl EncodePayload for Message { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + match self { + Message::Propose(version_table) => { + e.array(2)?.u16(0)?; + version_table.encode_payload(e)?; + } + Message::Accept(version_number, version_data) => { + e.array(3)?.u16(1)?; + e.u64(*version_number)?; + version_data.encode_payload(e)?; + } + Message::Refuse(reason) => { + e.array(2)?.u16(2)?; + reason.encode_payload(e)?; + } + }; + + Ok(()) + } +} + +impl DecodePayload for Message { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result> { + d.array()?; + + let msg = match d.u16()? { + 0 => todo!(), + 1 => { + let version_number = d.u64()?; + let version_data = VersionData::decode_payload(d)?; + + Message::Accept(version_number, version_data) + } + 2 => todo!(), + x => return Err(Box::new(MachineError::BadLabel(x))), + }; + + Ok(msg) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + Propose, + Confirm, + Done, +} + +#[derive(Debug)] +pub enum Output { + Pending, + Accepted(VersionNumber, VersionData), + Refused(RefuseReason), +} + +#[derive(Debug)] +pub struct Client { + state: State, + output: Output, + version_table: VersionTable, +} + +impl Client { + pub fn initial(version_table: VersionTable) -> Self { + Client { + state: State::Propose, + output: Output::Pending, + version_table, + } + } +} + +impl Agent for Client { + type Message = Message; + + fn is_done(&self) -> bool { + self.state == State::Done + } + + fn has_agency(&self) -> bool { + match self.state { + State::Propose => true, + State::Confirm => false, + State::Done => false, + } + } + + fn send_next( + self, + tx: &impl MachineOutput, + ) -> Result> { + match self.state { + State::Propose => { + tx.send_msg(&Message::Propose(self.version_table.clone()))?; + + Ok(Self { + state: State::Confirm, + ..self + }) + } + _ => panic!("I don't have agency, nothing to send"), + } + } + + fn receive_next( + self, + msg: Self::Message, + ) -> Result> { + match (self.state, msg) { + (State::Confirm, Message::Accept(version, data)) => Ok(Self { + state: State::Done, + output: Output::Accepted(version, data), + ..self + }), + (State::Confirm, Message::Refuse(reason)) => Ok(Self { + state: State::Done, + output: Output::Refused(reason), + ..self + }), + _ => panic!("Current state does't expect to receive a message"), + } + } +} diff --git a/pallas-machines/.gitignore b/pallas-machines/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas-machines/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas-machines/Cargo.toml b/pallas-machines/Cargo.toml new file mode 100644 index 0000000..dd7dacd --- /dev/null +++ b/pallas-machines/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "pallas-machines" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas-multiplexer = { path = "../pallas-multiplexer/" } +minicbor = { version="0.11.4", features=["half"] } +log = "0.4.14" diff --git a/pallas-machines/src/lib.rs b/pallas-machines/src/lib.rs new file mode 100644 index 0000000..eb424f4 --- /dev/null +++ b/pallas-machines/src/lib.rs @@ -0,0 +1,174 @@ +use log::{debug, trace, warn}; +use minicbor::{Decoder, Encoder}; +use pallas_multiplexer::Payload; +use std::borrow::Borrow; +use std::fmt::{Debug, Display}; +use std::sync::mpsc::{Receiver, Sender}; + +#[derive(Debug)] +pub enum MachineError { + BadLabel(u16), +} + +impl Display for MachineError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MachineError::BadLabel(label) => { + write!(f, "unknown message label [{}]", label) + } + } + } +} + +impl std::error::Error for MachineError {} + +pub type PayloadEncoder<'a> = Encoder<&'a mut Vec>; + +pub type PayloadDecoder<'a> = Decoder<'a>; + +pub trait EncodePayload { + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box>; +} + +pub fn to_payload( + data: &dyn EncodePayload, +) -> Result> { + let mut payload = Vec::new(); + let mut encoder = minicbor::encode::Encoder::new(&mut payload); + data.encode_payload(&mut encoder)?; + + Ok(payload) +} + +pub struct Message +where + D: EncodePayload, +{ + pub label: u32, + pub data: D, +} + +impl EncodePayload for Message +where + D: EncodePayload, +{ + fn encode_payload( + &self, + e: &mut PayloadEncoder, + ) -> Result<(), Box> { + // TODO: map concrete error to W::Error somehow? + // or just implement custom error struct + let data = to_payload(&self.data).unwrap(); + + e.array(2)?.u32(self.label)?.bytes(&data)?; + + Ok(()) + } +} + +pub trait MachineOutput { + fn send_msg( + &self, + data: &impl EncodePayload, + ) -> Result<(), Box>; +} + +impl MachineOutput for Sender { + fn send_msg( + &self, + data: &impl EncodePayload, + ) -> Result<(), Box> { + let payload = to_payload(data.borrow())?; + self.send(payload)?; + + Ok(()) + } +} + +pub trait DecodePayload: Sized { + fn decode_payload( + d: &mut PayloadDecoder, + ) -> Result>; +} + +pub struct PayloadDeconstructor { + rx: Receiver, + remaining: Vec, +} + +impl PayloadDeconstructor { + pub fn consume_next_message( + &mut self, + ) -> Result> { + if self.remaining.len() == 0 { + debug!("no remaining payload, fetching next segment"); + let payload = self.rx.recv()?; + self.remaining.extend(payload); + } + + let mut decoder = minicbor::Decoder::new(&self.remaining); + + match T::decode_payload(&mut decoder) { + Ok(t) => { + let new_pos = decoder.position(); + self.remaining.drain(0..new_pos); + debug!("consumed {} from payload buffer", new_pos); + Ok(t) + } + Err(err) => { + //TODO: we need to filter this only for correct errors + warn!("{:?}", err); + + debug!("payload incomplete, fetching next segment"); + let payload = self.rx.recv()?; + self.remaining.extend(payload); + + self.consume_next_message::() + } + } + } +} + +pub type Transition = Result>; + +pub trait Agent: Sized { + type Message: DecodePayload + Debug; + + fn is_done(&self) -> bool; + fn has_agency(&self) -> bool; + fn send_next(self, tx: &impl MachineOutput) -> Transition; + fn receive_next(self, msg: Self::Message) -> Transition; +} + +pub fn run_agent( + agent: T, + rx: Receiver, + output: &impl MachineOutput, +) -> Result> { + let mut input = PayloadDeconstructor { + rx, + remaining: Vec::new(), + }; + + let mut agent = agent; + + while !agent.is_done() { + debug!("evaluating agent {:?}", agent); + + match agent.has_agency() { + true => { + agent = agent.send_next(output)?; + } + false => { + let msg = input.consume_next_message::()?; + trace!("procesing inbound msg: {:?}", msg); + agent = agent.receive_next(msg)?; + } + } + } + + Ok(agent) +} diff --git a/pallas-multiplexer/.gitignore b/pallas-multiplexer/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas-multiplexer/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas-multiplexer/Cargo.toml b/pallas-multiplexer/Cargo.toml new file mode 100644 index 0000000..31a6738 --- /dev/null +++ b/pallas-multiplexer/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "pallas-multiplexer" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +log = "0.4.14" +byteorder = "1.4.3" +hex = "0.4.3" + +[dev-dependencies] +env_logger = "0.9.0" diff --git a/pallas-multiplexer/examples/listener.rs b/pallas-multiplexer/examples/listener.rs new file mode 100644 index 0000000..9b452da --- /dev/null +++ b/pallas-multiplexer/examples/listener.rs @@ -0,0 +1,27 @@ +use std::{net::TcpListener, thread, time::Duration}; + +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + let server = TcpListener::bind("0.0.0.0:3001").unwrap(); + let (bearer, _) = server.accept().unwrap(); + + let handles = + Multiplexer::new(bearer, &vec![0x8002u16, 0x8003u16][..]).unwrap(); + + for handle in handles { + thread::spawn(move || { + let (id, rx, tx) = handle; + loop { + let payload = rx.recv().unwrap(); + println!("id:{}, length:{}", id, payload.len()); + } + }); + } + + loop { + thread::sleep(Duration::from_secs(6000)); + } +} diff --git a/pallas-multiplexer/examples/sender.rs b/pallas-multiplexer/examples/sender.rs new file mode 100644 index 0000000..1788ecd --- /dev/null +++ b/pallas-multiplexer/examples/sender.rs @@ -0,0 +1,29 @@ +use std::{net::TcpStream, thread, time::Duration}; + +use pallas_multiplexer::Multiplexer; + +fn main() { + env_logger::init(); + + let bearer = TcpStream::connect("127.0.0.1:3001").unwrap(); + let handles = + Multiplexer::new(bearer, &vec![0x0002u16, 0x0003u16][..]).unwrap(); + + for (idx, handle) in handles.into_iter().enumerate() { + thread::spawn(move || { + let (id, rx, tx) = handle; + + loop { + let payload = vec![1; 65545]; + tx.send(payload).unwrap(); + thread::sleep(Duration::from_millis( + 50u64 + (idx as u64 * 10u64), + )); + } + }); + } + + loop { + thread::sleep(Duration::from_secs(6000)); + } +} diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs new file mode 100644 index 0000000..3775ffb --- /dev/null +++ b/pallas-multiplexer/src/lib.rs @@ -0,0 +1,215 @@ +use std::{ + collections::HashMap, + io::{Read, Write}, + net::TcpStream, + sync::mpsc::{self, Receiver, Sender, TryRecvError}, + thread, + time::{Duration, Instant}, +}; + +use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt}; +use log::{debug, error, log_enabled, trace, warn}; + +pub trait Bearer: Read + Write + Send + Sync + Sized { + fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error>; + + fn write_segment( + &mut self, + clock: Instant, + protocol_id: u16, + partial_payload: &[u8], + ) -> Result<(), std::io::Error>; + + fn clone(&self) -> Self; +} + +impl Bearer for TcpStream { + fn write_segment( + &mut self, + clock: Instant, + protocol_id: u16, + payload: &[u8], + ) -> Result<(), std::io::Error> { + let mut msg = Vec::new(); + msg.write_u32::(clock.elapsed().as_micros() as u32)?; + msg.write_u16::(protocol_id)?; + msg.write_u16::(payload.len() as u16)?; + + if log_enabled!(log::Level::Trace) { + trace!( + "sending segment, header {:?}, payload length: {}", + hex::encode(&msg), + payload.len() + ); + } + + msg.write(&payload[..]).unwrap(); + + self.write(&msg)?; + + self.flush() + } + + fn read_segment(&mut self) -> Result<(u16, u32, Payload), std::io::Error> { + let mut header = [0u8; 8]; + + self.read_exact(&mut header)?; + + let length = NetworkEndian::read_u16(&header[6..]) as usize; + let mut payload = vec![0u8; length]; + self.read_exact(&mut payload)?; + + let id = NetworkEndian::read_u16(&mut header[4..6]) as usize ^ 0x8000; + let ts = NetworkEndian::read_u32(&mut header[0..4]); + + if log_enabled!(log::Level::Trace) { + trace!( + "received segment, header: {:?}, payload length: {}", + hex::encode(&header), + payload.len() + ); + } + + Ok((id as u16, ts, payload)) + } + + fn clone(&self) -> Self { + self.try_clone().unwrap() + } +} + +const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535; + +pub type Payload = Vec; + +#[derive(Debug)] +pub struct Error {} + +fn tx_round( + bearer: &mut TBearer, + ingress: &MuxIngress, + clock: Instant, +) -> Result +where + TBearer: Bearer, +{ + let mut writes = 0u16; + + for (id, rx) in ingress.iter() { + match rx.try_recv() { + Ok(payload) => { + let chunks = payload.chunks(MAX_SEGMENT_PAYLOAD_LENGTH); + + for chunk in chunks { + bearer.write_segment(clock, *id, chunk)?; + writes += 1; + } + } + Err(TryRecvError::Disconnected) => { + //TODO: remove handle from list + warn!("protocol handle disconnected"); + } + Err(TryRecvError::Empty) => (), + }; + } + + Ok(writes) +} + +fn tx_loop(bearer: &mut TBearer, ingress: MuxIngress) +where + TBearer: Bearer, +{ + loop { + let clock = Instant::now(); + match tx_round(bearer, &ingress, clock) { + Err(err) => { + error!("{:?}", err); + panic!(); + } + Ok(0) => thread::sleep(Duration::from_millis(10)), + Ok(_) => (), + }; + } +} + +fn rx_loop(bearer: &mut TBearer, egress: DemuxerEgress) +where + TBearer: Bearer, +{ + let mut tx_map: HashMap<_, _> = egress.into_iter().collect(); + + loop { + match bearer.read_segment() { + Err(err) => { + error!("{:?}", err); + panic!(); + } + Ok(segment) => { + let (id, _ts, payload) = segment; + match tx_map.get(&id) { + Some(tx) => match tx.send(payload) { + Err(err) => { + error!("error sending egress tx to protocol, removing protocol from egress output. {:?}", err); + tx_map.remove(&id); + } + Ok(_) => { + debug!("successful tx to egress protocol"); + } + }, + None => warn!( + "received segment for protocol id not being demuxed {}", + id + ), + } + } + } + } +} + +type ChannelProtocolHandle = (u16, Receiver, Sender); +type ChannelIngressHandle = (u16, Receiver); +type ChannelEgressHandle = (u16, Sender); +type MuxIngress = Vec; +type DemuxerEgress = Vec; + +pub struct Multiplexer {} + +impl Multiplexer { + pub fn new( + bearer: TBearer, + protocols: &[u16], + ) -> Result, Error> + where + TBearer: Bearer + 'static, + { + let handles = protocols + .iter() + .map(|id| { + let (demux_tx, demux_rx) = mpsc::channel::(); + let (mux_tx, mux_rx) = mpsc::channel::(); + + let protocol_handle: ChannelProtocolHandle = + (*id, demux_rx, mux_tx); + let ingress_handle: ChannelIngressHandle = (*id, mux_rx); + let egress_handle: ChannelEgressHandle = (*id, demux_tx); + + (protocol_handle, (ingress_handle, egress_handle)) + }) + .collect::>(); + + let (protocol_handles, multiplex_handles): (Vec<_>, Vec<_>) = + handles.into_iter().unzip(); + + let (ingress, egress): (Vec<_>, Vec<_>) = + multiplex_handles.into_iter().unzip(); + + let mut tx_bearer = bearer.clone(); + thread::spawn(move || tx_loop(&mut tx_bearer, ingress)); + + let mut rx_bearer = bearer.clone(); + thread::spawn(move || rx_loop(&mut rx_bearer, egress)); + + Ok(protocol_handles) + } +} diff --git a/pallas/.gitignore b/pallas/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/pallas/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml new file mode 100644 index 0000000..aed51b7 --- /dev/null +++ b/pallas/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "pallas" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/txpipe/pallas" +license = "Apache-2.0" +authors = [ + "Santiago Carmuega " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas-multiplexer = { path = "../pallas-multiplexer/" } +pallas-machines = { path = "../pallas-machines/" } +pallas-handshake = { path = "../pallas-handshake/" } +pallas-blockfetch = { path = "../pallas-blockfetch/" } \ No newline at end of file diff --git a/pallas/src/lib.rs b/pallas/src/lib.rs new file mode 100644 index 0000000..ccd4698 --- /dev/null +++ b/pallas/src/lib.rs @@ -0,0 +1,12 @@ +//! Rust-native building blocks for the Cardano blockchain ecosystem +//! +//! Pallas is an expanding collection of modules that re-implements common +//! Cardano logic in native Rust. This crate doesn't provide any particular +//! application, it is meant to be used as a base layer to facilitate the +//! development of higher-level use-cases, such as explorers, wallets, etc (who +//! knows, maybe even a full node in the far away future). + +#![warn(missing_docs)] +#![warn(missing_doc_code_examples)] + +pub mod ouroboros; diff --git a/pallas/src/ouroboros.rs b/pallas/src/ouroboros.rs new file mode 100644 index 0000000..739e843 --- /dev/null +++ b/pallas/src/ouroboros.rs @@ -0,0 +1,12 @@ + +#[doc(inline)] +pub use pallas_multiplexer as multiplexer; + +#[doc(inline)] +pub use pallas_machines as machines; + +#[doc(inline)] +pub use pallas_handshake as handshake; + +#[doc(inline)] +pub use pallas_blockfetch as blockfetch; diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..649d5e1 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +edition = "2021" +wrap_comments = true \ No newline at end of file